Skip to content

Commit

Permalink
Various cleanups (#442)
Browse files Browse the repository at this point in the history
* Use a real exception

Signed-off-by: Matt Sicker <boards@gmail.com>

* Clean up old todo comment

Signed-off-by: Matt Sicker <boards@gmail.com>

* Remove unnecessary lines

Signed-off-by: Matt Sicker <boards@gmail.com>

* Use more appropriate types

Signed-off-by: Matt Sicker <boards@gmail.com>

* Use lambda logging and try-with-resources

Signed-off-by: Matt Sicker <boards@gmail.com>

* Maintain thread interrupt flag more consistently

Signed-off-by: Matt Sicker <boards@gmail.com>

* Adjust thread-safety for Channel.

Make it coherent.

* Revert some unnecessary cleanups

Signed-off-by: Matt Sicker <boards@gmail.com>

* Update src/main/java/hudson/remoting/ChunkedOutputStream.java

Co-authored-by: Oleg Nenashev <o.v.nenashev@gmail.com>

Co-authored-by: Jeff Thompson <jeffret.g@gmail.com>
Co-authored-by: Oleg Nenashev <o.v.nenashev@gmail.com>
  • Loading branch information
3 people authored May 7, 2021
1 parent d4fe474 commit f767654
Show file tree
Hide file tree
Showing 28 changed files with 129 additions and 138 deletions.
67 changes: 31 additions & 36 deletions src/main/java/hudson/remoting/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.Hashtable;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -159,7 +159,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable {
* Requests that are sent to the remote side for execution, yet we are waiting locally until
* we hear back their responses.
*/
/*package*/ final Map<Integer,Request<? extends Serializable,? extends Throwable>> pendingCalls = new Hashtable<>();
/*package*/ final Map<Integer,Request<? extends Serializable,? extends Throwable>> pendingCalls = new ConcurrentHashMap<>();

/**
* Remembers last I/O ID issued from locally to the other side, per thread.
Expand All @@ -170,8 +170,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable {
/**
* Records the {@link Request}s being executed on this channel, sent by the remote peer.
*/
/*package*/ final Map<Integer,Request<?,?>> executingCalls =
Collections.synchronizedMap(new Hashtable<>());
/*package*/ final Map<Integer,Request<?,?>> executingCalls = new ConcurrentHashMap<>();

/**
* {@link ClassLoader}s that are proxies of the remote classloaders.
Expand Down Expand Up @@ -219,15 +218,15 @@ public class Channel implements VirtualChannel, IChannel, Closeable {
/**
* Number of {@link Command} objects sent to the other side.
*/
private volatile long commandsSent;
private final AtomicLong commandsSent = new AtomicLong();

/**
* Number of {@link Command} objects received from the other side.
*
* When a transport is functioning correctly, {@link #commandsSent} of one side
* and {@link #commandsReceived} of the other side should closely match.
*/
private volatile long commandsReceived;
private final AtomicLong commandsReceived = new AtomicLong();

/**
* Timestamp of the last {@link Command} object sent/received, in
Expand All @@ -241,7 +240,8 @@ public class Channel implements VirtualChannel, IChannel, Closeable {
* without telling us anything, the {@link SocketOutputStream#write(int)} will
* return right away, and the socket only really times out after 10s of minutes.
*/
private volatile long lastCommandSentAt, lastCommandReceivedAt;
private final AtomicLong lastCommandSentAt = new AtomicLong();
private final AtomicLong lastCommandReceivedAt = new AtomicLong();

/**
* Timestamp of when this channel was connected/created, in
Expand Down Expand Up @@ -595,9 +595,9 @@ protected Channel(@Nonnull ChannelBuilder settings, @Nonnull CommandTransport tr
transport.setup(this, new CommandReceiver() {
@Override
public void handle(Command cmd) {
commandsReceived++;
commandsReceived.incrementAndGet();
long receivedAt = System.currentTimeMillis();
lastCommandReceivedAt = receivedAt;
lastCommandReceivedAt.set(receivedAt);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Received " + cmd);
} else if (logger.isLoggable(Level.FINER)) {
Expand Down Expand Up @@ -764,8 +764,8 @@ private ExecutorService createPipeWriterExecutor() {
logger.fine("Send "+cmd);

transport.write(cmd, cmd instanceof CloseCommand);
commandsSent++;
lastCommandSentAt = System.currentTimeMillis();
commandsSent.incrementAndGet();
lastCommandSentAt.set(System.currentTimeMillis());
}

/**
Expand Down Expand Up @@ -1001,9 +1001,7 @@ V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
return r.retrieve(this, UserRequest.getClassLoader(callable));

// re-wrap the exception so that we can capture the stack trace of the caller.
} catch (ClassNotFoundException e) {
throw new IOException("Remote call on " + name + " failed", e);
} catch (Error e) {
} catch (ClassNotFoundException | Error e) {
throw new IOException("Remote call on " + name + " failed", e);
} catch (SecurityException e) {
throw new IOException("Failed to deserialize response to " + request + ": " + e, e);
Expand Down Expand Up @@ -1080,18 +1078,17 @@ public void terminate(@Nonnull IOException e) {
logger.log(Level.WARNING, "Failed to close down the reader side of the transport", x);
}
try {
synchronized (pendingCalls) {
for (Request<?, ?> req : pendingCalls.values())
req.abort(e);
pendingCalls.clear();
for (Request<?, ?> req : pendingCalls.values()) {
req.abort(e);
}
synchronized (executingCalls) {
for (Request<?, ?> r : executingCalls.values()) {
java.util.concurrent.Future<?> f = r.future;
if (f != null) f.cancel(true);
pendingCalls.clear();
for (Request<?, ?> r : executingCalls.values()) {
java.util.concurrent.Future<?> f = r.future;
if (f != null) {
f.cancel(true);
}
executingCalls.clear();
}
executingCalls.clear();
exportedObjects.abort(e);
// break any object cycles into simple chains to simplify work for the garbage collector
reference.clear(e);
Expand Down Expand Up @@ -1184,7 +1181,7 @@ public synchronized void join() throws InterruptedException {
// given tickets like JENKINS-20709 that talks about hangs, it seems
// like a good defensive measure to periodically wake up to make sure
// that the wait condition is still not met in case we don't call notifyAll correctly
wait(30*1000);
wait(TimeUnit.SECONDS.toMillis(30));
}

/**
Expand Down Expand Up @@ -1438,13 +1435,14 @@ public void dumpPerformanceCounters(PrintWriter w) throws IOException {
public void dumpDiagnostics(@Nonnull PrintWriter w) throws IOException {
w.printf("Channel %s%n",name);
w.printf(" Created=%s%n", new Date(createdAt));
w.printf(" Commands sent=%d%n", commandsSent);
w.printf(" Commands received=%d%n", commandsReceived);
w.printf(" Last command sent=%s%n", new Date(lastCommandSentAt));
w.printf(" Last command received=%s%n", new Date(lastCommandReceivedAt));
w.printf(" Commands sent=%d%n", commandsSent.get());
w.printf(" Commands received=%d%n", commandsReceived.get());
w.printf(" Last command sent=%s%n", new Date(lastCommandSentAt.get()));
w.printf(" Last command received=%s%n", new Date(lastCommandReceivedAt.get()));

// TODO: Synchronize when Hashtable gets replaced by a modern collection.
w.printf(" Pending calls=%d%n", pendingCalls.size());
synchronized (pendingCalls) {
w.printf(" Pending calls=%d%n", pendingCalls.size());
}
}

/**
Expand Down Expand Up @@ -1760,11 +1758,8 @@ public void syncLocalIO() throws InterruptedException {
t.setName("I/O sync: "+old);
try {
// no one waits for the completion of this Runnable, so not using I/O ID
pipeWriter.submit(0,new Runnable() {
@Override
public void run() {
// noop
}
pipeWriter.submit(0, () -> {
// noop
}).get();
} catch (ExecutionException e) {
throw new AssertionError(e); // impossible
Expand Down Expand Up @@ -1835,7 +1830,7 @@ public ExportTable.ExportList startExportRecording() {
*/
public long getLastHeard() {
// TODO - this is not safe against clock skew and is called from jenkins core (and potentially plugins)
return lastCommandReceivedAt;
return lastCommandReceivedAt.get();
}

/*package*/ static Channel setCurrent(Channel channel) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/hudson/remoting/ChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Hashtable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class ChannelBuilder {
private final List<CallableDecorator> decorators = new ArrayList<>();
private boolean arbitraryCallableAllowed = true;
private boolean remoteClassLoadingAllowed = true;
private final Hashtable<Object,Object> properties = new Hashtable<>();
private final Map<Object,Object> properties = new HashMap<>();
private ClassFilter filter = ClassFilter.DEFAULT;

/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/hudson/remoting/ChunkedOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ class ChunkedOutputStream extends OutputStream {
private final OutputStream base;

public ChunkedOutputStream(int frameSize, OutputStream base) {
assert 0<frameSize && frameSize<=Short.MAX_VALUE;
if (frameSize < 0 || frameSize > Short.MAX_VALUE) {
throw new IllegalArgumentException("Illegal frame size: " + frameSize);
}

this.buf = new byte[frameSize];
size = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/hudson/remoting/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(@Nonnull Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, "Unhandled exception in thread " + t, e));
thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Unhandled exception in thread " + t));
return thread;
}
}
16 changes: 10 additions & 6 deletions src/main/java/hudson/remoting/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -119,7 +120,7 @@ public Thread newThread(@Nonnull final Runnable r) {
r.run();
});
thread.setDaemon(true);
thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, "Uncaught exception in thread " + t, e));
thread.setUncaughtExceptionHandler((t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in thread " + t));
return thread;
}
});
Expand Down Expand Up @@ -251,7 +252,7 @@ public Engine(EngineListener listener, List<URL> hudsonUrls, String secretKey, S
throw new IllegalArgumentException("No URLs given");
}
setUncaughtExceptionHandler((t, e) -> {
LOGGER.log(Level.SEVERE, "Uncaught exception in Engine thread " + t, e);
LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in Engine thread " + t);
interrupt();
});
}
Expand Down Expand Up @@ -575,8 +576,11 @@ public void onOpen(Session session, EndpointConfig config) {
private void onMessage(ByteBuffer message) {
try {
transport.receive(message);
} catch (IOException|InterruptedException x) {
} catch (IOException x) {
events.error(x);
} catch (InterruptedException x) {
events.error(x);
Thread.currentThread().interrupt();
}
}
@Override
Expand Down Expand Up @@ -649,7 +653,7 @@ public void closeRead() throws IOException {
} catch (IOException x) {
events.status(ping + " is not ready", x);
}
Thread.sleep(10_000);
TimeUnit.SECONDS.sleep(10);
}
events.onReconnect();
}
Expand Down Expand Up @@ -829,7 +833,7 @@ private JnlpEndpointResolver createEndpointResolver(List<String> jenkinsUrls) {

private void onConnectionRejected(String greeting) throws InterruptedException {
events.error(new Exception("The server rejected the connection: " + greeting));
Thread.sleep(10*1000);
TimeUnit.SECONDS.sleep(10);
}

/**
Expand All @@ -851,7 +855,7 @@ private Socket connectTcp(@Nonnull JnlpAgentEndpoint endpoint) throws IOExceptio
if(retry++>10) {
throw e;
}
Thread.sleep(1000*10);
TimeUnit.SECONDS.sleep(10);
events.status(msg+" (retrying:"+retry+")",e);
}
}
Expand Down
9 changes: 1 addition & 8 deletions src/main/java/hudson/remoting/FastPipedInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public int read(@Nonnull byte[] b, int off, int len) throws IOException {
try {
buffer.wait(FastPipedOutputStream.TIMEOUT);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
// Try again.
Expand Down Expand Up @@ -204,13 +205,5 @@ static final class ClosedBy extends Throwable {
ClosedBy(Throwable error) {
super("The pipe was closed at...", error);
}

/**
* If the pipe was closed by inducing an error, return that error object.
*/
@Override
public Throwable getCause() {
return super.getCause();
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/hudson/remoting/FileSystemJarCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String toString() {
protected URL lookInCache(Channel channel, long sum1, long sum2) throws IOException, InterruptedException {
File jar = map(sum1, sum2);
if (jar.exists()) {
LOGGER.log(Level.FINER, String.format("Jar file cache hit %16X%16X",sum1,sum2));
LOGGER.log(Level.FINER, () -> String.format("Jar file cache hit %16X%16X",sum1,sum2));
if (touch) {
Files.setLastModifiedTime(PathUtils.fileToPath(jar), FileTime.fromMillis(System.currentTimeMillis()));
}
Expand Down Expand Up @@ -112,7 +112,7 @@ protected URL retrieve(Channel channel, long sum1, long sum2) throws IOException
File tmp = createTempJar(target);
try {
try (RemoteOutputStream o = new RemoteOutputStream(new FileOutputStream(tmp))) {
LOGGER.log(Level.FINE, String.format("Retrieving jar file %16X%16X", sum1, sum2));
LOGGER.log(Level.FINE, () -> String.format("Retrieving jar file %16X%16X", sum1, sum2));
getJarLoader(channel).writeJarTo(sum1, sum2, o);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void run() {
}
};
diagnosisThread.setUncaughtExceptionHandler(
(t, e) -> LOGGER.log(Level.SEVERE, "Uncaught exception in diagnosis thread " + t, e));
(t, e) -> LOGGER.log(Level.SEVERE, e, () -> "Uncaught exception in diagnosis thread " + t));

// wait up to 1 sec to grab as much data as possible
diagnosisThread.start();
Expand Down
18 changes: 6 additions & 12 deletions src/main/java/hudson/remoting/ImportedClassLoaderTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

import hudson.remoting.RemoteClassLoader.IClassLoader;

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;

/**
Expand All @@ -37,7 +37,7 @@
*/
final class ImportedClassLoaderTable {
final Channel channel;
final Map<IClassLoader,ClassLoader> classLoaders = new Hashtable<>();
final Map<IClassLoader,ClassLoader> classLoaders = new ConcurrentHashMap<>();

ImportedClassLoaderTable(Channel channel) {
this.channel = channel;
Expand All @@ -50,7 +50,7 @@ final class ImportedClassLoaderTable {
* This method "consumes" the given oid for the purpose of reference counting.
*/
@Nonnull
public synchronized ClassLoader get(int oid) {
public ClassLoader get(int oid) {
return get(RemoteInvocationHandler.wrap(channel, oid, IClassLoader.class, false, false, false, false));
}

Expand All @@ -61,14 +61,8 @@ public synchronized ClassLoader get(int oid) {
* @return Classloader instance
*/
@Nonnull
public synchronized ClassLoader get(@Nonnull IClassLoader classLoaderProxy) {
ClassLoader r = classLoaders.get(classLoaderProxy);
if(r==null) {
// we need to be able to use the same hudson.remoting classes, hence delegate
// to this class loader.
r = RemoteClassLoader.create(channel.baseClassLoader,classLoaderProxy);
classLoaders.put(classLoaderProxy,r);
}
return r;
public ClassLoader get(@Nonnull IClassLoader classLoaderProxy) {
// we need to be able to use the same hudson.remoting classes, hence delegate to this class loader.
return classLoaders.computeIfAbsent(classLoaderProxy, proxy -> RemoteClassLoader.create(channel.baseClassLoader, proxy));
}
}
1 change: 1 addition & 0 deletions src/main/java/hudson/remoting/JarCacheSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void run() {
bailout(e);

LOGGER.log(Level.WARNING, String.format("Interrupted while resolving a jar %016x%016x", sum1, sum2), e);
Thread.currentThread().interrupt();
} catch (Throwable e) {
// in other general failures, we aren't retrying
// TODO: or should we?
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/hudson/remoting/JarLoaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void writeJarTo(long sum1, long sum2, OutputStream sink) throws IOExcepti
try {
channel.notifyJar(new File(url.toURI()));
} catch (URISyntaxException | IllegalArgumentException x) {
LOGGER.log(Level.WARNING, "cannot properly report " + url, x);
LOGGER.log(Level.WARNING, x, () -> "cannot properly report " + url);
}
} else {
LOGGER.log(Level.FINE, "serving non-file URL {0}", url);
Expand Down
Loading

0 comments on commit f767654

Please sign in to comment.