Skip to content

Commit

Permalink
Fix a deadlock with printer status threading (qzind#1181)
Browse files Browse the repository at this point in the history
Fix thread deadlock
Related qzind#1116, qzind#1124

Co-authored-by: Tres Finocchiaro <tres.finocchiaro@gmail.com>
  • Loading branch information
Vzor- and tresf authored Oct 4, 2023
1 parent 4d1eb2b commit c6b0ffa
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 83 deletions.
145 changes: 96 additions & 49 deletions src/qz/printer/status/StatusMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.eclipse.jetty.util.MultiMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.websocket.api.Session;
import qz.printer.PrintServiceMatcher;
import qz.printer.info.NativePrinterMap;
import qz.utils.PrintingUtilities;
Expand All @@ -27,8 +28,11 @@ public class StatusMonitor {
public static final String ALL_PRINTERS = "";

private static Thread printerConnectionsThread;
private static Thread statusEventDispatchThread;
private static final HashMap<String,Thread> notificationThreadCollection = new HashMap<>();
private static final HashMap<SocketConnection, StatusSession> statusSessions = new HashMap<>();
private static final MultiMap<SocketConnection> clientPrinterConnections = new MultiMap<>();
private static final LinkedList<Status> statusQueue = new LinkedList<>();

public synchronized static boolean launchNotificationThreads() {
ArrayList<String> printerNameList = new ArrayList<>();
Expand Down Expand Up @@ -74,45 +78,25 @@ public synchronized static void closeNotificationThreads() {
}
}

public synchronized static boolean startListening(SocketConnection connection, JSONObject params) throws JSONException {
public synchronized static boolean isListening(SocketConnection connection) {
return statusSessions.containsKey(connection);
}

public synchronized static boolean startListening(SocketConnection connection, Session session, JSONObject params) throws JSONException {
JSONArray printerNames = params.getJSONArray("printerNames");
boolean jobData = params.optBoolean("jobData", false);
int maxJobData = params.optInt("maxJobData", -1);
PrintingUtilities.Flavor dataFlavor = PrintingUtilities.Flavor.parse(params, PrintingUtilities.Flavor.PLAIN);
statusSessions.putIfAbsent(connection, new StatusSession(session));

if (printerNames.isNull(0)) { //listen to all printers
if (jobData) {
connection.getStatusListener().enableJobDataOnPrinter(ALL_PRINTERS, maxJobData, dataFlavor);
}
if (!clientPrinterConnections.containsKey(ALL_PRINTERS)) {
clientPrinterConnections.add(ALL_PRINTERS, connection);
} else if (!clientPrinterConnections.getValues(ALL_PRINTERS).contains(connection)) {
clientPrinterConnections.add(ALL_PRINTERS, connection);
}
addClientPrinterConnection(ALL_PRINTERS, connection, params);
} else { // listen to specific printer(s)
for (int i = 0; i < printerNames.length(); i++) {
String printerName = printerNames.getString(i);
if (SystemUtilities.isMac()) {
// Since 2.0: Mac printers use descriptions as printer names; Find CUPS ID by Description
printerName = NativePrinterMap.getInstance().lookupPrinterId(printerName);
// Handle edge-case where printer was recently renamed/added
if (printerName == null) {
// Call PrintServiceLookup.lookupPrintServices again
PrintServiceMatcher.getNativePrinterList(true);
printerName = NativePrinterMap.getInstance().lookupPrinterId(printerNames.getString(i));
}
}
if (SystemUtilities.isMac()) printerName = macNameFix(printerName);

if (printerName == null || printerName.equals("")) {
throw new IllegalArgumentException();
}
if(jobData) {
connection.getStatusListener().enableJobDataOnPrinter(printerName, maxJobData, dataFlavor);
}
if (!clientPrinterConnections.containsKey(printerName)) {
clientPrinterConnections.add(printerName, connection);
} else if (!clientPrinterConnections.getValues(printerName).contains(connection)) {
clientPrinterConnections.add(printerName, connection);
}
addClientPrinterConnection(printerName, connection, params);
}
}

Expand All @@ -124,6 +108,26 @@ public synchronized static boolean startListening(SocketConnection connection, J
}
}

public synchronized static void stopListening(SocketConnection connection) {
statusSessions.remove(connection);
closeListener(connection);
}

private synchronized static void addClientPrinterConnection(String printerName, SocketConnection connection, JSONObject params) {
boolean jobData = params.optBoolean("jobData", false);
int maxJobData = params.optInt("maxJobData", -1);
PrintingUtilities.Flavor dataFlavor = PrintingUtilities.Flavor.parse(params, PrintingUtilities.Flavor.PLAIN);

if (jobData) {
statusSessions.get(connection).enableJobDataOnPrinter(printerName, maxJobData, dataFlavor);
}
if (!clientPrinterConnections.containsKey(printerName)) {
clientPrinterConnections.add(printerName, connection);
} else if (!clientPrinterConnections.getValues(printerName).contains(connection)) {
clientPrinterConnections.add(printerName, connection);
}
}

public synchronized static void sendStatuses(SocketConnection connection) {
boolean sendForAllPrinters = false;
ArrayList<Status> statuses = isWindows() ? WmiPrinterStatusThread.getAllStatuses(): CupsUtils.getAllStatuses();
Expand All @@ -135,22 +139,20 @@ public synchronized static void sendStatuses(SocketConnection connection) {

for (Status status : statuses) {
if (sendForAllPrinters) {
connection.getStatusListener().statusChanged(status);
statusSessions.get(connection).statusChanged(status);
} else {
connections = clientPrinterConnections.get(status.getPrinter());
if ((connections != null) && connections.contains(connection)) {
connection.getStatusListener().statusChanged(status);
statusSessions.get(connection).statusChanged(status);
}
}
}
}

public synchronized static void closeListener(SocketConnection connection) {
for (Iterator<Map.Entry<String, List<SocketConnection>>> i = clientPrinterConnections.entrySet().iterator(); i.hasNext();) {
if (i.next().getValue().contains(connection)) {
i.remove();
}
}
clientPrinterConnections.entrySet().removeIf((Map.Entry<String, List<SocketConnection>> entry) -> (
entry.getValue().contains(connection)
));
if (clientPrinterConnections.isEmpty()) {
if (isWindows()) {
closeNotificationThreads();
Expand All @@ -160,18 +162,63 @@ public synchronized static void closeListener(SocketConnection connection) {
}
}

public synchronized static void statusChanged(Status[] statuses) {
HashSet<SocketConnection> connections = new HashSet<>();
for (Status status : statuses) {
if (clientPrinterConnections.containsKey(status.getPrinter())) {
connections.addAll(clientPrinterConnections.get(status.getPrinter()));
}
if (clientPrinterConnections.containsKey(ALL_PRINTERS)) {
connections.addAll(clientPrinterConnections.get(ALL_PRINTERS));
}
for (SocketConnection connection : connections) {
connection.getStatusListener().statusChanged(status);
private synchronized static void launchStatusEventDispatchThread() {
// Null is our main test to see if the thread needs to restart. If the thread was suspended, it won't be null, so check to see if it is alive as well.
if (statusEventDispatchThread != null && statusEventDispatchThread.isAlive()) return;
statusEventDispatchThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted() && dispatchStatusEvent()) {
// If we don't yield, this will constantly run dispatchStatusEvent and lock up the class, even though this thread isn't synchronized.
Thread.yield();
}
if (Thread.currentThread().isInterrupted()) log.warn("statusEventDispatchThread Interrupted");
}, "statusEventDispatchThread");
statusEventDispatchThread.start();
}

public synchronized static void statusChanged(Status[] statuses) {
// Add statuses to the queue, statusEventDispatchThread will resolve these one at a time until the queue is empty
Collections.addAll(statusQueue, statuses);
if (!statusQueue.isEmpty()) {
// If statusEventDispatchThread isn't already running, launch it
launchStatusEventDispatchThread();
}
}

// This is the main body of the statusEventDispatchThread.
// Dispatch one status event to n clients connection, based on clientPrinterConnections
// Returns false when there are no more statuses in the queue
private synchronized static boolean dispatchStatusEvent() {
if (statusQueue.isEmpty()) {
// Returning false will kill statusEventDispatchThread, but we also want to null out the value while we are still in a synchronized method
statusEventDispatchThread = null;
return false;
}
Status status = statusQueue.removeFirst();

HashSet<SocketConnection> listeningConnections = new HashSet<>();
if (clientPrinterConnections.containsKey(status.getPrinter())) {
// Find every client that subscribed to this printer
listeningConnections.addAll(clientPrinterConnections.get(status.getPrinter()));
}
if (clientPrinterConnections.containsKey(ALL_PRINTERS)) {
// And find every client that subscribed to all printers
listeningConnections.addAll(clientPrinterConnections.get(ALL_PRINTERS));
}
for (SocketConnection connection : listeningConnections) {
statusSessions.get(connection).statusChanged(status);
}
return true;
}

private static String macNameFix(String printerName) {
// Since 2.0: Mac printers use descriptions as printer names; Find CUPS ID by Description
String returnString = NativePrinterMap.getInstance().lookupPrinterId(printerName);
// Handle edge-case where printer was recently renamed/added
if (returnString == null) {
// Call PrintServiceLookup.lookupPrintServices again
PrintServiceMatcher.getNativePrinterList(true);
returnString = NativePrinterMap.getInstance().lookupPrinterId(printerName);
}
return returnString;
}
}
}
20 changes: 8 additions & 12 deletions src/qz/ws/PrintSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import qz.communication.*;
import qz.printer.PrintServiceMatcher;
import qz.printer.status.StatusMonitor;
import qz.printer.status.StatusSession;
import qz.utils.*;

import javax.management.ListenerNotFoundException;
import javax.usb.util.UsbUtil;
import java.awt.*;
import java.io.EOFException;
Expand Down Expand Up @@ -225,7 +223,7 @@ private boolean validSignature(Certificate certificate, JSONObject message) thro
* @param session WebSocket session
* @param json JSON received from web API
*/
private void processMessage(Session session, JSONObject json, SocketConnection connection, RequestState request) throws JSONException, SerialPortException, DeviceException, IOException, ListenerNotFoundException {
private void processMessage(Session session, JSONObject json, SocketConnection connection, RequestState request) throws JSONException, SerialPortException, DeviceException, IOException {
String UID = json.optString("uid");
SocketMethod call = SocketMethod.findFromCall(json.optString("call"));
JSONObject params = json.optJSONObject("params");
Expand Down Expand Up @@ -291,24 +289,22 @@ private void processMessage(Session session, JSONObject json, SocketConnection c
sendResult(session, UID, PrintServiceMatcher.getPrintersJSON(true));
break;
case PRINTERS_START_LISTENING:
if (!connection.hasStatusListener()) {
connection.startStatusListener(new StatusSession(session));
if (StatusMonitor.startListening(connection, session, params)) {
sendResult(session, UID, null);
} else {
sendError(session, UID, "Listening failed.");
}
StatusMonitor.startListening(connection, params);
sendResult(session, UID, null);
break;
case PRINTERS_GET_STATUS:
if (connection.hasStatusListener()) {
if (StatusMonitor.isListening(connection)) {
StatusMonitor.sendStatuses(connection);
} else {
sendError(session, UID, "No printer listeners started for this client.");
}
sendResult(session, UID, null);
break;
case PRINTERS_STOP_LISTENING:
if (connection.hasStatusListener()) {
connection.stopStatusListener();
}
StatusMonitor.stopListening(connection);
sendResult(session, UID, null);
break;
case PRINT:
Expand Down Expand Up @@ -795,4 +791,4 @@ private static synchronized void send(Session session, JSONObject reply) throws
}
}

}
}
24 changes: 2 additions & 22 deletions src/qz/ws/SocketConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import qz.auth.Certificate;
import qz.communication.*;
import qz.printer.status.StatusMonitor;
import qz.printer.status.StatusSession;
import qz.utils.FileWatcher;

import java.io.IOException;
Expand All @@ -21,7 +20,6 @@ public class SocketConnection {
private Certificate certificate;

private DeviceListener deviceListener;
private StatusSession statusListener;

// serial port -> open SerialIO
private final HashMap<String,SerialIO> openSerialPorts = new HashMap<>();
Expand Down Expand Up @@ -89,24 +87,6 @@ public void stopDeviceListening() {
deviceListener = null;
}

public synchronized boolean hasStatusListener() {
return statusListener != null;
}

public synchronized void startStatusListener(StatusSession listener) {
statusListener = listener;
}

public synchronized void stopStatusListener() {
StatusMonitor.closeListener(this);
statusListener = null;
}

public synchronized StatusSession getStatusListener() {
return statusListener;
}


public void addFileListener(Path absolute, FileIO listener) {
openFiles.put(absolute, listener);
}
Expand Down Expand Up @@ -169,7 +149,7 @@ public synchronized void disconnect() throws SerialPortException, DeviceExceptio

removeAllFileListeners();
stopDeviceListening();
stopStatusListener();
StatusMonitor.stopListening(this);
}

}
}

0 comments on commit c6b0ffa

Please sign in to comment.