diff --git a/src/qz/printer/status/StatusMonitor.java b/src/qz/printer/status/StatusMonitor.java index 0b16e9375..f123d8a9f 100644 --- a/src/qz/printer/status/StatusMonitor.java +++ b/src/qz/printer/status/StatusMonitor.java @@ -8,6 +8,7 @@ import org.eclipse.jetty.util.MultiMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.eclipse.jetty.websocket.api.Session; import qz.printer.PrintServiceMatcher; import qz.printer.info.NativePrinterMap; import qz.utils.PrintingUtilities; @@ -27,8 +28,11 @@ public class StatusMonitor { public static final String ALL_PRINTERS = ""; private static Thread printerConnectionsThread; + private static Thread statusEventDispatchThread; private static final HashMap notificationThreadCollection = new HashMap<>(); + private static final HashMap statusSessions = new HashMap<>(); private static final MultiMap clientPrinterConnections = new MultiMap<>(); + private static final LinkedList statusQueue = new LinkedList<>(); public synchronized static boolean launchNotificationThreads() { ArrayList printerNameList = new ArrayList<>(); @@ -74,45 +78,25 @@ public synchronized static void closeNotificationThreads() { } } - public synchronized static boolean startListening(SocketConnection connection, JSONObject params) throws JSONException { + public synchronized static boolean isListening(SocketConnection connection) { + return statusSessions.containsKey(connection); + } + + public synchronized static boolean startListening(SocketConnection connection, Session session, JSONObject params) throws JSONException { JSONArray printerNames = params.getJSONArray("printerNames"); - boolean jobData = params.optBoolean("jobData", false); - int maxJobData = params.optInt("maxJobData", -1); - PrintingUtilities.Flavor dataFlavor = PrintingUtilities.Flavor.parse(params, PrintingUtilities.Flavor.PLAIN); + statusSessions.putIfAbsent(connection, new StatusSession(session)); if (printerNames.isNull(0)) { //listen to all printers - if (jobData) { - connection.getStatusListener().enableJobDataOnPrinter(ALL_PRINTERS, maxJobData, dataFlavor); - } - if (!clientPrinterConnections.containsKey(ALL_PRINTERS)) { - clientPrinterConnections.add(ALL_PRINTERS, connection); - } else if (!clientPrinterConnections.getValues(ALL_PRINTERS).contains(connection)) { - clientPrinterConnections.add(ALL_PRINTERS, connection); - } + addClientPrinterConnection(ALL_PRINTERS, connection, params); } else { // listen to specific printer(s) for (int i = 0; i < printerNames.length(); i++) { String printerName = printerNames.getString(i); - if (SystemUtilities.isMac()) { - // Since 2.0: Mac printers use descriptions as printer names; Find CUPS ID by Description - printerName = NativePrinterMap.getInstance().lookupPrinterId(printerName); - // Handle edge-case where printer was recently renamed/added - if (printerName == null) { - // Call PrintServiceLookup.lookupPrintServices again - PrintServiceMatcher.getNativePrinterList(true); - printerName = NativePrinterMap.getInstance().lookupPrinterId(printerNames.getString(i)); - } - } + if (SystemUtilities.isMac()) printerName = macNameFix(printerName); + if (printerName == null || printerName.equals("")) { throw new IllegalArgumentException(); } - if(jobData) { - connection.getStatusListener().enableJobDataOnPrinter(printerName, maxJobData, dataFlavor); - } - if (!clientPrinterConnections.containsKey(printerName)) { - clientPrinterConnections.add(printerName, connection); - } else if (!clientPrinterConnections.getValues(printerName).contains(connection)) { - clientPrinterConnections.add(printerName, connection); - } + addClientPrinterConnection(printerName, connection, params); } } @@ -124,6 +108,26 @@ public synchronized static boolean startListening(SocketConnection connection, J } } + public synchronized static void stopListening(SocketConnection connection) { + statusSessions.remove(connection); + closeListener(connection); + } + + private synchronized static void addClientPrinterConnection(String printerName, SocketConnection connection, JSONObject params) { + boolean jobData = params.optBoolean("jobData", false); + int maxJobData = params.optInt("maxJobData", -1); + PrintingUtilities.Flavor dataFlavor = PrintingUtilities.Flavor.parse(params, PrintingUtilities.Flavor.PLAIN); + + if (jobData) { + statusSessions.get(connection).enableJobDataOnPrinter(printerName, maxJobData, dataFlavor); + } + if (!clientPrinterConnections.containsKey(printerName)) { + clientPrinterConnections.add(printerName, connection); + } else if (!clientPrinterConnections.getValues(printerName).contains(connection)) { + clientPrinterConnections.add(printerName, connection); + } + } + public synchronized static void sendStatuses(SocketConnection connection) { boolean sendForAllPrinters = false; ArrayList statuses = isWindows() ? WmiPrinterStatusThread.getAllStatuses(): CupsUtils.getAllStatuses(); @@ -135,22 +139,20 @@ public synchronized static void sendStatuses(SocketConnection connection) { for (Status status : statuses) { if (sendForAllPrinters) { - connection.getStatusListener().statusChanged(status); + statusSessions.get(connection).statusChanged(status); } else { connections = clientPrinterConnections.get(status.getPrinter()); if ((connections != null) && connections.contains(connection)) { - connection.getStatusListener().statusChanged(status); + statusSessions.get(connection).statusChanged(status); } } } } public synchronized static void closeListener(SocketConnection connection) { - for (Iterator>> i = clientPrinterConnections.entrySet().iterator(); i.hasNext();) { - if (i.next().getValue().contains(connection)) { - i.remove(); - } - } + clientPrinterConnections.entrySet().removeIf((Map.Entry> entry) -> ( + entry.getValue().contains(connection) + )); if (clientPrinterConnections.isEmpty()) { if (isWindows()) { closeNotificationThreads(); @@ -160,18 +162,63 @@ public synchronized static void closeListener(SocketConnection connection) { } } - public synchronized static void statusChanged(Status[] statuses) { - HashSet connections = new HashSet<>(); - for (Status status : statuses) { - if (clientPrinterConnections.containsKey(status.getPrinter())) { - connections.addAll(clientPrinterConnections.get(status.getPrinter())); - } - if (clientPrinterConnections.containsKey(ALL_PRINTERS)) { - connections.addAll(clientPrinterConnections.get(ALL_PRINTERS)); - } - for (SocketConnection connection : connections) { - connection.getStatusListener().statusChanged(status); + private synchronized static void launchStatusEventDispatchThread() { + // Null is our main test to see if the thread needs to restart. If the thread was suspended, it won't be null, so check to see if it is alive as well. + if (statusEventDispatchThread != null && statusEventDispatchThread.isAlive()) return; + statusEventDispatchThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted() && dispatchStatusEvent()) { + // If we don't yield, this will constantly run dispatchStatusEvent and lock up the class, even though this thread isn't synchronized. + Thread.yield(); } + if (Thread.currentThread().isInterrupted()) log.warn("statusEventDispatchThread Interrupted"); + }, "statusEventDispatchThread"); + statusEventDispatchThread.start(); + } + + public synchronized static void statusChanged(Status[] statuses) { + // Add statuses to the queue, statusEventDispatchThread will resolve these one at a time until the queue is empty + Collections.addAll(statusQueue, statuses); + if (!statusQueue.isEmpty()) { + // If statusEventDispatchThread isn't already running, launch it + launchStatusEventDispatchThread(); + } + } + + // This is the main body of the statusEventDispatchThread. + // Dispatch one status event to n clients connection, based on clientPrinterConnections + // Returns false when there are no more statuses in the queue + private synchronized static boolean dispatchStatusEvent() { + if (statusQueue.isEmpty()) { + // Returning false will kill statusEventDispatchThread, but we also want to null out the value while we are still in a synchronized method + statusEventDispatchThread = null; + return false; + } + Status status = statusQueue.removeFirst(); + + HashSet listeningConnections = new HashSet<>(); + if (clientPrinterConnections.containsKey(status.getPrinter())) { + // Find every client that subscribed to this printer + listeningConnections.addAll(clientPrinterConnections.get(status.getPrinter())); + } + if (clientPrinterConnections.containsKey(ALL_PRINTERS)) { + // And find every client that subscribed to all printers + listeningConnections.addAll(clientPrinterConnections.get(ALL_PRINTERS)); + } + for (SocketConnection connection : listeningConnections) { + statusSessions.get(connection).statusChanged(status); + } + return true; + } + + private static String macNameFix(String printerName) { + // Since 2.0: Mac printers use descriptions as printer names; Find CUPS ID by Description + String returnString = NativePrinterMap.getInstance().lookupPrinterId(printerName); + // Handle edge-case where printer was recently renamed/added + if (returnString == null) { + // Call PrintServiceLookup.lookupPrintServices again + PrintServiceMatcher.getNativePrinterList(true); + returnString = NativePrinterMap.getInstance().lookupPrinterId(printerName); } + return returnString; } -} +} \ No newline at end of file diff --git a/src/qz/ws/PrintSocketClient.java b/src/qz/ws/PrintSocketClient.java index 567a167a6..4ffb1a5bf 100644 --- a/src/qz/ws/PrintSocketClient.java +++ b/src/qz/ws/PrintSocketClient.java @@ -20,10 +20,8 @@ import qz.communication.*; import qz.printer.PrintServiceMatcher; import qz.printer.status.StatusMonitor; -import qz.printer.status.StatusSession; import qz.utils.*; -import javax.management.ListenerNotFoundException; import javax.usb.util.UsbUtil; import java.awt.*; import java.io.EOFException; @@ -225,7 +223,7 @@ private boolean validSignature(Certificate certificate, JSONObject message) thro * @param session WebSocket session * @param json JSON received from web API */ - private void processMessage(Session session, JSONObject json, SocketConnection connection, RequestState request) throws JSONException, SerialPortException, DeviceException, IOException, ListenerNotFoundException { + private void processMessage(Session session, JSONObject json, SocketConnection connection, RequestState request) throws JSONException, SerialPortException, DeviceException, IOException { String UID = json.optString("uid"); SocketMethod call = SocketMethod.findFromCall(json.optString("call")); JSONObject params = json.optJSONObject("params"); @@ -291,14 +289,14 @@ private void processMessage(Session session, JSONObject json, SocketConnection c sendResult(session, UID, PrintServiceMatcher.getPrintersJSON(true)); break; case PRINTERS_START_LISTENING: - if (!connection.hasStatusListener()) { - connection.startStatusListener(new StatusSession(session)); + if (StatusMonitor.startListening(connection, session, params)) { + sendResult(session, UID, null); + } else { + sendError(session, UID, "Listening failed."); } - StatusMonitor.startListening(connection, params); - sendResult(session, UID, null); break; case PRINTERS_GET_STATUS: - if (connection.hasStatusListener()) { + if (StatusMonitor.isListening(connection)) { StatusMonitor.sendStatuses(connection); } else { sendError(session, UID, "No printer listeners started for this client."); @@ -306,9 +304,7 @@ private void processMessage(Session session, JSONObject json, SocketConnection c sendResult(session, UID, null); break; case PRINTERS_STOP_LISTENING: - if (connection.hasStatusListener()) { - connection.stopStatusListener(); - } + StatusMonitor.stopListening(connection); sendResult(session, UID, null); break; case PRINT: @@ -795,4 +791,4 @@ private static synchronized void send(Session session, JSONObject reply) throws } } -} +} \ No newline at end of file diff --git a/src/qz/ws/SocketConnection.java b/src/qz/ws/SocketConnection.java index dbadd164d..9e4e1c079 100644 --- a/src/qz/ws/SocketConnection.java +++ b/src/qz/ws/SocketConnection.java @@ -6,7 +6,6 @@ import qz.auth.Certificate; import qz.communication.*; import qz.printer.status.StatusMonitor; -import qz.printer.status.StatusSession; import qz.utils.FileWatcher; import java.io.IOException; @@ -21,7 +20,6 @@ public class SocketConnection { private Certificate certificate; private DeviceListener deviceListener; - private StatusSession statusListener; // serial port -> open SerialIO private final HashMap openSerialPorts = new HashMap<>(); @@ -89,24 +87,6 @@ public void stopDeviceListening() { deviceListener = null; } - public synchronized boolean hasStatusListener() { - return statusListener != null; - } - - public synchronized void startStatusListener(StatusSession listener) { - statusListener = listener; - } - - public synchronized void stopStatusListener() { - StatusMonitor.closeListener(this); - statusListener = null; - } - - public synchronized StatusSession getStatusListener() { - return statusListener; - } - - public void addFileListener(Path absolute, FileIO listener) { openFiles.put(absolute, listener); } @@ -169,7 +149,7 @@ public synchronized void disconnect() throws SerialPortException, DeviceExceptio removeAllFileListeners(); stopDeviceListening(); - stopStatusListener(); + StatusMonitor.stopListening(this); } -} +} \ No newline at end of file