From f767654728b076666aea35520fffc651e342ed43 Mon Sep 17 00:00:00 2001 From: Matt Sicker Date: Thu, 6 May 2021 20:33:12 -0500 Subject: [PATCH] Various cleanups (#442) * Use a real exception Signed-off-by: Matt Sicker * Clean up old todo comment Signed-off-by: Matt Sicker * Remove unnecessary lines Signed-off-by: Matt Sicker * Use more appropriate types Signed-off-by: Matt Sicker * Use lambda logging and try-with-resources Signed-off-by: Matt Sicker * Maintain thread interrupt flag more consistently Signed-off-by: Matt Sicker * Adjust thread-safety for Channel. Make it coherent. * Revert some unnecessary cleanups Signed-off-by: Matt Sicker * Update src/main/java/hudson/remoting/ChunkedOutputStream.java Co-authored-by: Oleg Nenashev Co-authored-by: Jeff Thompson Co-authored-by: Oleg Nenashev --- src/main/java/hudson/remoting/Channel.java | 67 +++++++++---------- .../java/hudson/remoting/ChannelBuilder.java | 4 +- .../hudson/remoting/ChunkedOutputStream.java | 4 +- .../hudson/remoting/DaemonThreadFactory.java | 2 +- src/main/java/hudson/remoting/Engine.java | 16 +++-- .../hudson/remoting/FastPipedInputStream.java | 9 +-- .../hudson/remoting/FileSystemJarCache.java | 4 +- .../remoting/FlightRecorderInputStream.java | 2 +- .../remoting/ImportedClassLoaderTable.java | 18 ++--- .../java/hudson/remoting/JarCacheSupport.java | 1 + .../java/hudson/remoting/JarLoaderImpl.java | 2 +- src/main/java/hudson/remoting/Launcher.java | 32 ++++----- src/main/java/hudson/remoting/PingThread.java | 2 +- .../hudson/remoting/RemoteClassLoader.java | 7 +- src/main/java/hudson/remoting/Request.java | 4 +- .../remoting/SynchronousCommandTransport.java | 15 +++-- .../hudson/remoting/forward/CopyThread.java | 6 +- .../remoting/forward/PortForwarder.java | 4 +- src/main/java/hudson/remoting/jnlp/Main.java | 7 +- .../jenkinsci/remoting/nio/FifoBuffer.java | 4 ++ .../jenkinsci/remoting/nio/NioChannelHub.java | 2 + .../jenkinsci/remoting/protocol/IOHub.java | 8 +-- .../DelegatingX509ExtendedTrustManager.java | 2 - .../impl/AgentProtocolClientFilterLayer.java | 7 +- .../impl/ChannelApplicationLayer.java | 13 +++- .../impl/ConnectionHeadersFilterLayer.java | 15 +++-- .../remoting/util/SettableFuture.java | 8 +-- .../engine/HandlerLoopbackLoadStress.java | 2 +- 28 files changed, 129 insertions(+), 138 deletions(-) diff --git a/src/main/java/hudson/remoting/Channel.java b/src/main/java/hudson/remoting/Channel.java index fdf6e92ce..f0f4621da 100644 --- a/src/main/java/hudson/remoting/Channel.java +++ b/src/main/java/hudson/remoting/Channel.java @@ -53,7 +53,7 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Date; -import java.util.Hashtable; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -159,7 +159,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable { * Requests that are sent to the remote side for execution, yet we are waiting locally until * we hear back their responses. */ - /*package*/ final Map> pendingCalls = new Hashtable<>(); + /*package*/ final Map> pendingCalls = new ConcurrentHashMap<>(); /** * Remembers last I/O ID issued from locally to the other side, per thread. @@ -170,8 +170,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable { /** * Records the {@link Request}s being executed on this channel, sent by the remote peer. */ - /*package*/ final Map> executingCalls = - Collections.synchronizedMap(new Hashtable<>()); + /*package*/ final Map> executingCalls = new ConcurrentHashMap<>(); /** * {@link ClassLoader}s that are proxies of the remote classloaders. @@ -219,7 +218,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable { /** * Number of {@link Command} objects sent to the other side. */ - private volatile long commandsSent; + private final AtomicLong commandsSent = new AtomicLong(); /** * Number of {@link Command} objects received from the other side. @@ -227,7 +226,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable { * When a transport is functioning correctly, {@link #commandsSent} of one side * and {@link #commandsReceived} of the other side should closely match. */ - private volatile long commandsReceived; + private final AtomicLong commandsReceived = new AtomicLong(); /** * Timestamp of the last {@link Command} object sent/received, in @@ -241,7 +240,8 @@ public class Channel implements VirtualChannel, IChannel, Closeable { * without telling us anything, the {@link SocketOutputStream#write(int)} will * return right away, and the socket only really times out after 10s of minutes. */ - private volatile long lastCommandSentAt, lastCommandReceivedAt; + private final AtomicLong lastCommandSentAt = new AtomicLong(); + private final AtomicLong lastCommandReceivedAt = new AtomicLong(); /** * Timestamp of when this channel was connected/created, in @@ -595,9 +595,9 @@ protected Channel(@Nonnull ChannelBuilder settings, @Nonnull CommandTransport tr transport.setup(this, new CommandReceiver() { @Override public void handle(Command cmd) { - commandsReceived++; + commandsReceived.incrementAndGet(); long receivedAt = System.currentTimeMillis(); - lastCommandReceivedAt = receivedAt; + lastCommandReceivedAt.set(receivedAt); if (logger.isLoggable(Level.FINE)) { logger.fine("Received " + cmd); } else if (logger.isLoggable(Level.FINER)) { @@ -764,8 +764,8 @@ private ExecutorService createPipeWriterExecutor() { logger.fine("Send "+cmd); transport.write(cmd, cmd instanceof CloseCommand); - commandsSent++; - lastCommandSentAt = System.currentTimeMillis(); + commandsSent.incrementAndGet(); + lastCommandSentAt.set(System.currentTimeMillis()); } /** @@ -1001,9 +1001,7 @@ V call(Callable callable) throws IOException, T, InterruptedException { return r.retrieve(this, UserRequest.getClassLoader(callable)); // re-wrap the exception so that we can capture the stack trace of the caller. - } catch (ClassNotFoundException e) { - throw new IOException("Remote call on " + name + " failed", e); - } catch (Error e) { + } catch (ClassNotFoundException | Error e) { throw new IOException("Remote call on " + name + " failed", e); } catch (SecurityException e) { throw new IOException("Failed to deserialize response to " + request + ": " + e, e); @@ -1080,18 +1078,17 @@ public void terminate(@Nonnull IOException e) { logger.log(Level.WARNING, "Failed to close down the reader side of the transport", x); } try { - synchronized (pendingCalls) { - for (Request req : pendingCalls.values()) - req.abort(e); - pendingCalls.clear(); + for (Request req : pendingCalls.values()) { + req.abort(e); } - synchronized (executingCalls) { - for (Request r : executingCalls.values()) { - java.util.concurrent.Future f = r.future; - if (f != null) f.cancel(true); + pendingCalls.clear(); + for (Request r : executingCalls.values()) { + java.util.concurrent.Future f = r.future; + if (f != null) { + f.cancel(true); } - executingCalls.clear(); } + executingCalls.clear(); exportedObjects.abort(e); // break any object cycles into simple chains to simplify work for the garbage collector reference.clear(e); @@ -1184,7 +1181,7 @@ public synchronized void join() throws InterruptedException { // given tickets like JENKINS-20709 that talks about hangs, it seems // like a good defensive measure to periodically wake up to make sure // that the wait condition is still not met in case we don't call notifyAll correctly - wait(30*1000); + wait(TimeUnit.SECONDS.toMillis(30)); } /** @@ -1438,13 +1435,14 @@ public void dumpPerformanceCounters(PrintWriter w) throws IOException { public void dumpDiagnostics(@Nonnull PrintWriter w) throws IOException { w.printf("Channel %s%n",name); w.printf(" Created=%s%n", new Date(createdAt)); - w.printf(" Commands sent=%d%n", commandsSent); - w.printf(" Commands received=%d%n", commandsReceived); - w.printf(" Last command sent=%s%n", new Date(lastCommandSentAt)); - w.printf(" Last command received=%s%n", new Date(lastCommandReceivedAt)); + w.printf(" Commands sent=%d%n", commandsSent.get()); + w.printf(" Commands received=%d%n", commandsReceived.get()); + w.printf(" Last command sent=%s%n", new Date(lastCommandSentAt.get())); + w.printf(" Last command received=%s%n", new Date(lastCommandReceivedAt.get())); - // TODO: Synchronize when Hashtable gets replaced by a modern collection. - w.printf(" Pending calls=%d%n", pendingCalls.size()); + synchronized (pendingCalls) { + w.printf(" Pending calls=%d%n", pendingCalls.size()); + } } /** @@ -1760,11 +1758,8 @@ public void syncLocalIO() throws InterruptedException { t.setName("I/O sync: "+old); try { // no one waits for the completion of this Runnable, so not using I/O ID - pipeWriter.submit(0,new Runnable() { - @Override - public void run() { - // noop - } + pipeWriter.submit(0, () -> { + // noop }).get(); } catch (ExecutionException e) { throw new AssertionError(e); // impossible @@ -1835,7 +1830,7 @@ public ExportTable.ExportList startExportRecording() { */ public long getLastHeard() { // TODO - this is not safe against clock skew and is called from jenkins core (and potentially plugins) - return lastCommandReceivedAt; + return lastCommandReceivedAt.get(); } /*package*/ static Channel setCurrent(Channel channel) { diff --git a/src/main/java/hudson/remoting/ChannelBuilder.java b/src/main/java/hudson/remoting/ChannelBuilder.java index 3c915c44b..3ae58a627 100644 --- a/src/main/java/hudson/remoting/ChannelBuilder.java +++ b/src/main/java/hudson/remoting/ChannelBuilder.java @@ -22,7 +22,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Hashtable; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -55,7 +55,7 @@ public class ChannelBuilder { private final List decorators = new ArrayList<>(); private boolean arbitraryCallableAllowed = true; private boolean remoteClassLoadingAllowed = true; - private final Hashtable properties = new Hashtable<>(); + private final Map properties = new HashMap<>(); private ClassFilter filter = ClassFilter.DEFAULT; /** diff --git a/src/main/java/hudson/remoting/ChunkedOutputStream.java b/src/main/java/hudson/remoting/ChunkedOutputStream.java index 6cf356e49..52027a116 100644 --- a/src/main/java/hudson/remoting/ChunkedOutputStream.java +++ b/src/main/java/hudson/remoting/ChunkedOutputStream.java @@ -24,7 +24,9 @@ class ChunkedOutputStream extends OutputStream { private final OutputStream base; public ChunkedOutputStream(int frameSize, OutputStream base) { - assert 0 Short.MAX_VALUE) { + throw new IllegalArgumentException("Illegal frame size: " + frameSize); + } this.buf = new byte[frameSize]; size = 0; diff --git a/src/main/java/hudson/remoting/DaemonThreadFactory.java b/src/main/java/hudson/remoting/DaemonThreadFactory.java index 041a2b5ce..d2829b9ce 100644 --- a/src/main/java/hudson/remoting/DaemonThreadFactory.java +++ b/src/main/java/hudson/remoting/DaemonThreadFactory.java @@ -15,7 +15,7 @@ public class DaemonThreadFactory implements ThreadFactory { public Thread newThread(@Nonnull Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); - thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, "Unhandled exception in thread " + t, e)); + thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Unhandled exception in thread " + t)); return thread; } } diff --git a/src/main/java/hudson/remoting/Engine.java b/src/main/java/hudson/remoting/Engine.java index b29fad73a..d44ec1792 100644 --- a/src/main/java/hudson/remoting/Engine.java +++ b/src/main/java/hudson/remoting/Engine.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -119,7 +120,7 @@ public Thread newThread(@Nonnull final Runnable r) { r.run(); }); thread.setDaemon(true); - thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, "Uncaught exception in thread " + t, e)); + thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in thread " + t)); return thread; } }); @@ -251,7 +252,7 @@ public Engine(EngineListener listener, List hudsonUrls, String secretKey, S throw new IllegalArgumentException("No URLs given"); } setUncaughtExceptionHandler((t, e) -> { - LOGGER.log(Level.SEVERE, "Uncaught exception in Engine thread " + t, e); + LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in Engine thread " + t); interrupt(); }); } @@ -575,8 +576,11 @@ public void onOpen(Session session, EndpointConfig config) { private void onMessage(ByteBuffer message) { try { transport.receive(message); - } catch (IOException|InterruptedException x) { + } catch (IOException x) { + events.error(x); + } catch (InterruptedException x) { events.error(x); + Thread.currentThread().interrupt(); } } @Override @@ -649,7 +653,7 @@ public void closeRead() throws IOException { } catch (IOException x) { events.status(ping + " is not ready", x); } - Thread.sleep(10_000); + TimeUnit.SECONDS.sleep(10); } events.onReconnect(); } @@ -829,7 +833,7 @@ private JnlpEndpointResolver createEndpointResolver(List jenkinsUrls) { private void onConnectionRejected(String greeting) throws InterruptedException { events.error(new Exception("The server rejected the connection: " + greeting)); - Thread.sleep(10*1000); + TimeUnit.SECONDS.sleep(10); } /** @@ -851,7 +855,7 @@ private Socket connectTcp(@Nonnull JnlpAgentEndpoint endpoint) throws IOExceptio if(retry++>10) { throw e; } - Thread.sleep(1000*10); + TimeUnit.SECONDS.sleep(10); events.status(msg+" (retrying:"+retry+")",e); } } diff --git a/src/main/java/hudson/remoting/FastPipedInputStream.java b/src/main/java/hudson/remoting/FastPipedInputStream.java index 7079e655b..4cb7644cf 100644 --- a/src/main/java/hudson/remoting/FastPipedInputStream.java +++ b/src/main/java/hudson/remoting/FastPipedInputStream.java @@ -175,6 +175,7 @@ public int read(@Nonnull byte[] b, int off, int len) throws IOException { try { buffer.wait(FastPipedOutputStream.TIMEOUT); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } // Try again. @@ -204,13 +205,5 @@ static final class ClosedBy extends Throwable { ClosedBy(Throwable error) { super("The pipe was closed at...", error); } - - /** - * If the pipe was closed by inducing an error, return that error object. - */ - @Override - public Throwable getCause() { - return super.getCause(); - } } } diff --git a/src/main/java/hudson/remoting/FileSystemJarCache.java b/src/main/java/hudson/remoting/FileSystemJarCache.java index d95a91dd5..2afd1f183 100644 --- a/src/main/java/hudson/remoting/FileSystemJarCache.java +++ b/src/main/java/hudson/remoting/FileSystemJarCache.java @@ -73,7 +73,7 @@ public String toString() { protected URL lookInCache(Channel channel, long sum1, long sum2) throws IOException, InterruptedException { File jar = map(sum1, sum2); if (jar.exists()) { - LOGGER.log(Level.FINER, String.format("Jar file cache hit %16X%16X",sum1,sum2)); + LOGGER.log(Level.FINER, () -> String.format("Jar file cache hit %16X%16X",sum1,sum2)); if (touch) { Files.setLastModifiedTime(PathUtils.fileToPath(jar), FileTime.fromMillis(System.currentTimeMillis())); } @@ -112,7 +112,7 @@ protected URL retrieve(Channel channel, long sum1, long sum2) throws IOException File tmp = createTempJar(target); try { try (RemoteOutputStream o = new RemoteOutputStream(new FileOutputStream(tmp))) { - LOGGER.log(Level.FINE, String.format("Retrieving jar file %16X%16X", sum1, sum2)); + LOGGER.log(Level.FINE, () -> String.format("Retrieving jar file %16X%16X", sum1, sum2)); getJarLoader(channel).writeJarTo(sum1, sum2, o); } diff --git a/src/main/java/hudson/remoting/FlightRecorderInputStream.java b/src/main/java/hudson/remoting/FlightRecorderInputStream.java index b7fe14942..0d9939deb 100644 --- a/src/main/java/hudson/remoting/FlightRecorderInputStream.java +++ b/src/main/java/hudson/remoting/FlightRecorderInputStream.java @@ -68,7 +68,7 @@ public void run() { } }; diagnosisThread.setUncaughtExceptionHandler( - (t, e) -> LOGGER.log(Level.SEVERE, "Uncaught exception in diagnosis thread " + t, e)); + (t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in diagnosis thread " + t)); // wait up to 1 sec to grab as much data as possible diagnosisThread.start(); diff --git a/src/main/java/hudson/remoting/ImportedClassLoaderTable.java b/src/main/java/hudson/remoting/ImportedClassLoaderTable.java index cbafa15c4..94e6b79d3 100644 --- a/src/main/java/hudson/remoting/ImportedClassLoaderTable.java +++ b/src/main/java/hudson/remoting/ImportedClassLoaderTable.java @@ -25,8 +25,8 @@ import hudson.remoting.RemoteClassLoader.IClassLoader; -import java.util.Hashtable; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; /** @@ -37,7 +37,7 @@ */ final class ImportedClassLoaderTable { final Channel channel; - final Map classLoaders = new Hashtable<>(); + final Map classLoaders = new ConcurrentHashMap<>(); ImportedClassLoaderTable(Channel channel) { this.channel = channel; @@ -50,7 +50,7 @@ final class ImportedClassLoaderTable { * This method "consumes" the given oid for the purpose of reference counting. */ @Nonnull - public synchronized ClassLoader get(int oid) { + public ClassLoader get(int oid) { return get(RemoteInvocationHandler.wrap(channel, oid, IClassLoader.class, false, false, false, false)); } @@ -61,14 +61,8 @@ public synchronized ClassLoader get(int oid) { * @return Classloader instance */ @Nonnull - public synchronized ClassLoader get(@Nonnull IClassLoader classLoaderProxy) { - ClassLoader r = classLoaders.get(classLoaderProxy); - if(r==null) { - // we need to be able to use the same hudson.remoting classes, hence delegate - // to this class loader. - r = RemoteClassLoader.create(channel.baseClassLoader,classLoaderProxy); - classLoaders.put(classLoaderProxy,r); - } - return r; + public ClassLoader get(@Nonnull IClassLoader classLoaderProxy) { + // we need to be able to use the same hudson.remoting classes, hence delegate to this class loader. + return classLoaders.computeIfAbsent(classLoaderProxy, proxy -> RemoteClassLoader.create(channel.baseClassLoader, proxy)); } } diff --git a/src/main/java/hudson/remoting/JarCacheSupport.java b/src/main/java/hudson/remoting/JarCacheSupport.java index dad74e252..8c613a71a 100644 --- a/src/main/java/hudson/remoting/JarCacheSupport.java +++ b/src/main/java/hudson/remoting/JarCacheSupport.java @@ -121,6 +121,7 @@ public void run() { bailout(e); LOGGER.log(Level.WARNING, String.format("Interrupted while resolving a jar %016x%016x", sum1, sum2), e); + Thread.currentThread().interrupt(); } catch (Throwable e) { // in other general failures, we aren't retrying // TODO: or should we? diff --git a/src/main/java/hudson/remoting/JarLoaderImpl.java b/src/main/java/hudson/remoting/JarLoaderImpl.java index 4f4244330..d598e0e3b 100644 --- a/src/main/java/hudson/remoting/JarLoaderImpl.java +++ b/src/main/java/hudson/remoting/JarLoaderImpl.java @@ -48,7 +48,7 @@ public void writeJarTo(long sum1, long sum2, OutputStream sink) throws IOExcepti try { channel.notifyJar(new File(url.toURI())); } catch (URISyntaxException | IllegalArgumentException x) { - LOGGER.log(Level.WARNING, "cannot properly report " + url, x); + LOGGER.log(Level.WARNING, x, () -> "cannot properly report " + url); } } else { LOGGER.log(Level.FINE, "serving non-file URL {0}", url); diff --git a/src/main/java/hudson/remoting/Launcher.java b/src/main/java/hudson/remoting/Launcher.java index 7c479dc54..bd15e560b 100644 --- a/src/main/java/hudson/remoting/Launcher.java +++ b/src/main/java/hudson/remoting/Launcher.java @@ -92,6 +92,7 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -424,13 +425,11 @@ private SSLSocketFactory getSSLSocketFactory() if (file.isFile() && (length = file.length()) < 65536 && length > "-----BEGIN CERTIFICATE-----\n-----END CERTIFICATE-----".length()) { - FileInputStream fis = null; - try { + try (FileInputStream fis = new FileInputStream(file)) { // we do basic size validation, if there are x509 certificates that have a PEM encoding // larger // than 64kb we can revisit the upper bound. cert = new byte[(int) length]; - fis = new FileInputStream(file); int read = fis.read(cert); if (cert.length != read) { LOGGER.log(Level.WARNING, "Only read {0} bytes from {1}, expected to read {2}", @@ -439,10 +438,8 @@ private SSLSocketFactory getSSLSocketFactory() continue; } } catch (IOException e) { - LOGGER.log(Level.WARNING, "Could not read certificate from " + file, e); + LOGGER.log(Level.WARNING, e, () -> "Could not read certificate from " + file); continue; - } finally { - IOUtils.closeQuietly(fis); } } else { if (file.isFile()) { @@ -574,7 +571,7 @@ public List parseJnlpArguments() throws ParserConfigurationException, SA System.err.println("Failed to obtain "+ agentJnlpURL); e.printStackTrace(System.err); System.err.println("Waiting 10 seconds before retry"); - Thread.sleep(10*1000); + TimeUnit.SECONDS.sleep(10); // retry } finally { if (con instanceof HttpURLConnection) { @@ -616,22 +613,19 @@ static Document loadDom(InputStream is) throws ParserConfigurationException, SAX @Deprecated @SuppressFBWarnings(value = {"UNENCRYPTED_SERVER_SOCKET", "DM_DEFAULT_ENCODING"}, justification = "This is an old, insecure mechanism that should be removed. port number file should be in platform default encoding.") private void runAsTcpServer() throws IOException, InterruptedException { - // if no one connects for too long, assume something went wrong - // and avoid hanging forever - ServerSocket ss = new ServerSocket(0,1); - ss.setSoTimeout(30*1000); - - // write a port file to report the port number - try (FileWriter w = new FileWriter(tcpPortFile)) { - w.write(String.valueOf(ss.getLocalPort())); - } - // accept just one connection and that's it. // when we are done, remove the port file to avoid stale port file Socket s; - try { + try (ServerSocket ss = new ServerSocket(0,1)) { + // if no one connects for too long, assume something went wrong + // and avoid hanging forever + ss.setSoTimeout(30*1000); + + // write a port file to report the port number + try (FileWriter w = new FileWriter(tcpPortFile)) { + w.write(String.valueOf(ss.getLocalPort())); + } s = ss.accept(); - ss.close(); } finally { boolean deleted = tcpPortFile.delete(); if (!deleted) { diff --git a/src/main/java/hudson/remoting/PingThread.java b/src/main/java/hudson/remoting/PingThread.java index 89dc73b96..3f65f6c3a 100644 --- a/src/main/java/hudson/remoting/PingThread.java +++ b/src/main/java/hudson/remoting/PingThread.java @@ -92,7 +92,7 @@ public void run() { // wait until the next check long diff; while((diff = nextCheck - System.nanoTime()) > 0) { - Thread.sleep(TimeUnit.NANOSECONDS.toMillis(diff)); + TimeUnit.NANOSECONDS.sleep(diff); } } } catch (ChannelClosedException e) { diff --git a/src/main/java/hudson/remoting/RemoteClassLoader.java b/src/main/java/hudson/remoting/RemoteClassLoader.java index 9a5d09f05..5bebbd854 100644 --- a/src/main/java/hudson/remoting/RemoteClassLoader.java +++ b/src/main/java/hudson/remoting/RemoteClassLoader.java @@ -605,7 +605,10 @@ public Enumeration findResources(String name) throws IOException { // getResources2 always give us ResourceImageBoth so // .get() shouldn't block v.add(image.image.resolveURL(channel, name).get()); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new Error("Failed to load resources " + name, e); + } catch (ExecutionException e) { throw new Error("Failed to load resources " + name, e); } } @@ -639,7 +642,7 @@ private void sleepForRetry() { if (RETRY_SLEEP_DURATION_MILLISECONDS > 0) { Thread.sleep(RETRY_SLEEP_DURATION_MILLISECONDS); } - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { // Not much to do if we can't sleep. Run through the tries more quickly. } } diff --git a/src/main/java/hudson/remoting/Request.java b/src/main/java/hudson/remoting/Request.java index 702ea3fc5..3263884f6 100644 --- a/src/main/java/hudson/remoting/Request.java +++ b/src/main/java/hudson/remoting/Request.java @@ -380,7 +380,7 @@ public void run() { // error return rsp = new Response<>(Request.this, id, calcLastIoId(), t); } finally { - CURRENT.set(null); + CURRENT.remove(); } if(chainCause) { rsp.chainCause(createdAt); @@ -393,7 +393,7 @@ public void run() { if (e instanceof ChannelClosedException && !logger.isLoggable(Level.FINE)) { logger.log(Level.INFO, "Failed to send back a reply to the request {0}: {1}", new Object[] {this, e}); } else { - logger.log(Level.WARNING, "Failed to send back a reply to the request " + this, e); + logger.log(Level.WARNING, e, () -> "Failed to send back a reply to the request " + this); } } finally { channel.executingCalls.remove(id); diff --git a/src/main/java/hudson/remoting/SynchronousCommandTransport.java b/src/main/java/hudson/remoting/SynchronousCommandTransport.java index 1bead796f..6154d8028 100644 --- a/src/main/java/hudson/remoting/SynchronousCommandTransport.java +++ b/src/main/java/hudson/remoting/SynchronousCommandTransport.java @@ -46,7 +46,7 @@ public ReaderThread(CommandReceiver receiver) { super("Channel reader thread: "+channel.getName()); this.receiver = receiver; setUncaughtExceptionHandler((t, e) -> { - LOGGER.log(Level.SEVERE, "Uncaught exception in SynchronousCommandTransport.ReaderThread " + t, e); + LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in SynchronousCommandTransport.ReaderThread " + t); channel.terminate(new IOException("Unexpected reader termination", e)); }); } @@ -61,9 +61,9 @@ public void run() { cmd = read(); } catch (SocketTimeoutException ex) { if (RDR_FAIL_ON_SOCKET_TIMEOUT) { - LOGGER.log(Level.SEVERE, "Socket timeout in the Synchronous channel reader." + LOGGER.log(Level.SEVERE, ex, () -> "Socket timeout in the Synchronous channel reader." + " The channel will be interrupted, because " + RDR_SOCKET_TIMEOUT_PROPERTY_NAME - + " is set", ex); + + " is set"); throw ex; } // Timeout happened during the read operation. @@ -74,7 +74,7 @@ public void run() { } catch (EOFException e) { throw new IOException("Unexpected termination of the channel", e); } catch (ClassNotFoundException e) { - LOGGER.log(Level.SEVERE, "Unable to read a command (channel " + name + ")", e); + LOGGER.log(Level.SEVERE, e, () -> "Unable to read a command (channel " + name + ")"); continue; } @@ -82,13 +82,14 @@ public void run() { } closeRead(); } catch (InterruptedException e) { - LOGGER.log(Level.SEVERE, "I/O error in channel "+name,e); + LOGGER.log(Level.SEVERE, e, () -> "I/O error in channel "+name); + Thread.currentThread().interrupt(); channel.terminate((InterruptedIOException) new InterruptedIOException().initCause(e)); } catch (IOException e) { - LOGGER.log(Level.INFO, "I/O error in channel "+name,e); + LOGGER.log(Level.INFO, e, () -> "I/O error in channel "+name); channel.terminate(e); } catch (RuntimeException | Error e) { - LOGGER.log(Level.SEVERE, "Unexpected error in channel "+name,e); + LOGGER.log(Level.SEVERE, e, () -> "Unexpected error in channel "+name); channel.terminate(new IOException("Unexpected reader termination", e)); throw e; } finally { diff --git a/src/main/java/hudson/remoting/forward/CopyThread.java b/src/main/java/hudson/remoting/forward/CopyThread.java index 107e5224a..a999fafb1 100644 --- a/src/main/java/hudson/remoting/forward/CopyThread.java +++ b/src/main/java/hudson/remoting/forward/CopyThread.java @@ -29,10 +29,10 @@ private CopyThread(String threadName, InputStream in, OutputStream out, Runnable this.out = out; setUncaughtExceptionHandler((t, e) -> { if (remainingTries > 0) { - LOGGER.log(Level.WARNING, "Uncaught exception in CopyThread " + t + ", retrying copy", e); + LOGGER.log(Level.WARNING, e, () -> "Uncaught exception in CopyThread " + t + ", retrying copy"); new CopyThread(threadName, in, out, termination, remainingTries - 1).start(); } else { - LOGGER.log(Level.SEVERE, "Uncaught exception in CopyThread " + t + ", out of retries", e); + LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in CopyThread " + t + ", out of retries"); termination.run(); } }); @@ -46,7 +46,7 @@ public void run() { while ((len = in.read(buf)) > 0) out.write(buf, 0, len); } catch (IOException e) { - LOGGER.log(Level.WARNING, "Exception while copying in thread: " + getName(), e); + LOGGER.log(Level.WARNING, e, () -> "Exception while copying in thread: " + getName()); } } } diff --git a/src/main/java/hudson/remoting/forward/PortForwarder.java b/src/main/java/hudson/remoting/forward/PortForwarder.java index a5c1675a3..b6d381d11 100644 --- a/src/main/java/hudson/remoting/forward/PortForwarder.java +++ b/src/main/java/hudson/remoting/forward/PortForwarder.java @@ -63,7 +63,7 @@ public PortForwarder(int localPort, Forwarder forwarder) throws IOException { // the caller can explicitly cancel this by doing "setDaemon(false)" setDaemon(true); setUncaughtExceptionHandler((t, e) -> { - LOGGER.log(SEVERE, "Uncaught exception in PortForwarder thread " + t, e); + LOGGER.log(SEVERE, e, () -> "Uncaught exception in PortForwarder thread " + t); try { socket.close(); } catch (IOException e1) { @@ -86,7 +86,7 @@ public void run() { new Thread("Port forwarding session from "+s.getRemoteSocketAddress()) { { setUncaughtExceptionHandler( - (t, e) -> LOGGER.log(Level.SEVERE, "Unhandled exception in port forwarding session " + t, e)); + (t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Unhandled exception in port forwarding session " + t)); } @Override public void run() { diff --git a/src/main/java/hudson/remoting/jnlp/Main.java b/src/main/java/hudson/remoting/jnlp/Main.java index a165732db..0eeeeff36 100644 --- a/src/main/java/hudson/remoting/jnlp/Main.java +++ b/src/main/java/hudson/remoting/jnlp/Main.java @@ -355,12 +355,9 @@ public Engine createEngine() { // larger // than 64kb we can revisit the upper bound. cert = new byte[(int) length]; - FileInputStream fis = new FileInputStream(file); final int read; - try { + try (FileInputStream fis = new FileInputStream(file)) { read = fis.read(cert); - } finally { - fis.close(); } if (cert.length != read) { LOGGER.log(Level.WARNING, "Only read {0} bytes from {1}, expected to read {2}", @@ -369,7 +366,7 @@ public Engine createEngine() { continue; } } catch (IOException e) { - LOGGER.log(Level.WARNING, "Could not read certificate from " + file, e); + LOGGER.log(Level.WARNING, e, () -> "Could not read certificate from " + file); continue; } } else { diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index c86daad93..ff90cba90 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -566,6 +566,7 @@ public void write(int b) throws IOException { byte[] buf = new byte[]{(byte)b}; FifoBuffer.this.write(buf); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } } @@ -575,6 +576,7 @@ public void write(@Nonnull byte[] b, int off, int len) throws IOException { try { FifoBuffer.this.write(b,off,len); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } } @@ -600,6 +602,7 @@ public int read() throws IOException { if (n==0) throw new AssertionError(); return ((int)b[0])&0xFF; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } } @@ -609,6 +612,7 @@ public int read(@Nonnull byte[] b, int off, int len) throws IOException { try { return FifoBuffer.this.read(b, off, len); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } } diff --git a/src/main/java/org/jenkinsci/remoting/nio/NioChannelHub.java b/src/main/java/org/jenkinsci/remoting/nio/NioChannelHub.java index 7d3ff1aa4..a6283eb31 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/NioChannelHub.java +++ b/src/main/java/org/jenkinsci/remoting/nio/NioChannelHub.java @@ -233,6 +233,7 @@ public void writeBlock(Channel channel, byte[] bytes) throws IOException { pos+=frame; } while(hasMore); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } } @@ -480,6 +481,7 @@ protected CommandTransport makeTransport(InputStream is, OutputStream os, Mode m startedLock.wait(); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } diff --git a/src/main/java/org/jenkinsci/remoting/protocol/IOHub.java b/src/main/java/org/jenkinsci/remoting/protocol/IOHub.java index 0a7f5fe49..9e30d8402 100644 --- a/src/main/java/org/jenkinsci/remoting/protocol/IOHub.java +++ b/src/main/java/org/jenkinsci/remoting/protocol/IOHub.java @@ -496,8 +496,8 @@ public final void run() { sleepNanos); } try { - Thread.sleep(sleepNanos / 1000000L, (int) (sleepNanos % 1000000L)); - } catch (InterruptedException e1) { + TimeUnit.NANOSECONDS.sleep(sleepNanos); + } catch (InterruptedException ignored) { // ignore } } else { @@ -555,9 +555,7 @@ public void run() { } } catch (InterruptedException ex) { // interrupted - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.log(Level.FINE, "{0}: Interrupted", ex); - } + LOGGER.log(Level.FINE, "Interrupted", ex); } finally { watcherThread.setName(oldName); LOGGER.log(Level.FINEST, "{0}: Finished", watcherName); diff --git a/src/main/java/org/jenkinsci/remoting/protocol/cert/DelegatingX509ExtendedTrustManager.java b/src/main/java/org/jenkinsci/remoting/protocol/cert/DelegatingX509ExtendedTrustManager.java index f3d360826..c53979a5a 100644 --- a/src/main/java/org/jenkinsci/remoting/protocol/cert/DelegatingX509ExtendedTrustManager.java +++ b/src/main/java/org/jenkinsci/remoting/protocol/cert/DelegatingX509ExtendedTrustManager.java @@ -29,14 +29,12 @@ import javax.annotation.Nonnull; import javax.net.ssl.SSLEngine; import javax.net.ssl.X509ExtendedTrustManager; -import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement; /** * An {@link X509ExtendedTrustManager} that delegates to a runtime mutable delegate {@link X509ExtendedTrustManager}. * * @since 3.0 */ -@IgnoreJRERequirement // TODO We override some methods in Java 7, so remove this ignore when baseline is Java 7 public class DelegatingX509ExtendedTrustManager extends X509ExtendedTrustManager { /** * Our delegate. diff --git a/src/main/java/org/jenkinsci/remoting/protocol/impl/AgentProtocolClientFilterLayer.java b/src/main/java/org/jenkinsci/remoting/protocol/impl/AgentProtocolClientFilterLayer.java index 2b26672be..e4e9ea661 100644 --- a/src/main/java/org/jenkinsci/remoting/protocol/impl/AgentProtocolClientFilterLayer.java +++ b/src/main/java/org/jenkinsci/remoting/protocol/impl/AgentProtocolClientFilterLayer.java @@ -26,7 +26,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; import org.jenkinsci.remoting.protocol.FilterLayer; import org.jenkinsci.remoting.util.ByteBufferQueue; @@ -92,11 +91,7 @@ public synchronized void doSend(@Nonnull ByteBuffer data) throws IOException { sendQueue.put(data); flushSend(sendQueue); } else { - try { - next().doSend(data); - } catch (IOException e) { - throw e; - } + next().doSend(data); } completed(); } diff --git a/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java b/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java index a9eb47779..01634f2f2 100644 --- a/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java +++ b/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java @@ -34,6 +34,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; @@ -196,7 +197,13 @@ public void onRead(@Nonnull ByteBuffer data) throws IOException { assert futureChannel.isDone(); try { channel = futureChannel.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + InterruptedIOException ie = new InterruptedIOException(); + ie.bytesTransferred = data.remaining(); + data.position(data.limit()); + Thread.currentThread().interrupt(); + throw ie; + } catch (ExecutionException e) { // should never get here as futureChannel.isDone(), but just in case we do throw away data.position(data.limit()); // dump any remaining data as nobody will ever receive it throw new IOException(e); @@ -212,9 +219,11 @@ public void onRead(@Nonnull ByteBuffer data) throws IOException { } catch (InterruptedException e) { // if the channel receive was interrupted we cannot guarantee that the partial state has been correctly // stored, thus we cannot trust the channel instance any more and it needs to be closed. - IOException reason = new IOException(e); + InterruptedIOException reason = new InterruptedIOException(); + reason.bytesTransferred = data.remaining(); channel.terminate(reason); data.position(data.limit()); // dump any remaining data as nobody will ever receive it + Thread.currentThread().interrupt(); throw reason; } } diff --git a/src/main/java/org/jenkinsci/remoting/protocol/impl/ConnectionHeadersFilterLayer.java b/src/main/java/org/jenkinsci/remoting/protocol/impl/ConnectionHeadersFilterLayer.java index 50bbb9f37..9f49d5ed4 100644 --- a/src/main/java/org/jenkinsci/remoting/protocol/impl/ConnectionHeadersFilterLayer.java +++ b/src/main/java/org/jenkinsci/remoting/protocol/impl/ConnectionHeadersFilterLayer.java @@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import org.jenkinsci.remoting.protocol.FilterLayer; @@ -110,7 +111,7 @@ public class ConnectionHeadersFilterLayer extends FilterLayer { /** * Marker for the abort reason */ - private volatile ConnectionRefusalException aborted; + private final AtomicReference aborted = new AtomicReference<>(); /** * Constructor. @@ -143,7 +144,7 @@ public synchronized void start() { */ @Override public void onRecv(@Nonnull ByteBuffer data) throws IOException { - final ConnectionRefusalException aborted = this.aborted; + final ConnectionRefusalException aborted = this.aborted.get(); if (aborted != null) { throw newAbortCause(aborted); } @@ -395,7 +396,7 @@ private synchronized void doStartAbort(ConnectionRefusalException cause, ByteBuf * Finalizes the abort of the connection. */ private synchronized void onAbortCompleted() { - ConnectionHeadersFilterLayer.this.aborted = abortCause; + ConnectionHeadersFilterLayer.this.aborted.set(abortCause); abort(abortCause); try { next().doCloseSend(); @@ -420,7 +421,7 @@ public synchronized void onRecvClosed(IOException cause) throws IOException { super.onRecvClosed(cause); return; } - ConnectionRefusalException aborted = this.aborted; + ConnectionRefusalException aborted = this.aborted.get(); if (aborted != null && !(cause instanceof ConnectionRefusalException)) { // handle the case where we have refuseded the incoming headers and actually aborted ConnectionRefusalException newCause = newAbortCause(aborted); @@ -449,7 +450,7 @@ public synchronized void onRecvClosed(IOException cause) throws IOException { */ @Override public boolean isRecvOpen() { - return aborted == null && super.isRecvOpen(); + return aborted.get() == null && super.isRecvOpen(); } /** @@ -457,7 +458,7 @@ public boolean isRecvOpen() { */ @Override public void doSend(@Nonnull ByteBuffer data) throws IOException { - ConnectionRefusalException aborted = this.aborted; + ConnectionRefusalException aborted = this.aborted.get(); if (aborted != null) { throw newAbortCause(aborted); } @@ -510,7 +511,7 @@ public void doSend(@Nonnull ByteBuffer data) throws IOException { */ @Override public boolean isSendOpen() { - return aborted == null && super.isSendOpen(); + return aborted.get() == null && super.isSendOpen(); } /** diff --git a/src/main/java/org/jenkinsci/remoting/util/SettableFuture.java b/src/main/java/org/jenkinsci/remoting/util/SettableFuture.java index 999d19c93..599cd9f9a 100644 --- a/src/main/java/org/jenkinsci/remoting/util/SettableFuture.java +++ b/src/main/java/org/jenkinsci/remoting/util/SettableFuture.java @@ -273,8 +273,8 @@ public void addListener(@Nonnull Runnable listener, @Nonnull Executor executor) try { executor.execute(listener); } catch (RuntimeException e) { - LOGGER.log(Level.SEVERE, - "RuntimeException while executing runnable " + listener + " with executor " + executor, e); + LOGGER.log(Level.SEVERE, e, () -> + "RuntimeException while executing runnable " + listener + " with executor " + executor); } } } @@ -294,9 +294,9 @@ private void notifyListeners() { try { entry.getValue().execute(entry.getKey()); } catch (RuntimeException e) { - LOGGER.log(Level.SEVERE, + LOGGER.log(Level.SEVERE, e, () -> "RuntimeException while executing runnable " + entry.getKey() + " with executor " - + entry.getValue(), e); + + entry.getValue()); } } diff --git a/src/test/java/org/jenkinsci/remoting/engine/HandlerLoopbackLoadStress.java b/src/test/java/org/jenkinsci/remoting/engine/HandlerLoopbackLoadStress.java index 07b3432fe..5868f164b 100644 --- a/src/test/java/org/jenkinsci/remoting/engine/HandlerLoopbackLoadStress.java +++ b/src/test/java/org/jenkinsci/remoting/engine/HandlerLoopbackLoadStress.java @@ -311,7 +311,7 @@ public static void main(String[] args) throws Exception { final SocketAddress serverAddress; if (config.client == null) { serverAddress = stress.startServer(config.listen); - Thread.sleep(1000); + TimeUnit.SECONDS.sleep(1); } else { serverAddress = toSocketAddress(config.client); }