From 10d8671b4622a1c388c0f2ad43f3a3f521e084e9 Mon Sep 17 00:00:00 2001 From: Michael Dombrowski Date: Thu, 14 Sep 2023 18:42:34 -0400 Subject: [PATCH] fix: check for 0 subscription batches (#124) --- .../greengrass/mqtt/moquette/MQTTService.java | 28 ++++++++++--------- .../java/io/moquette/broker/PostOffice.java | 10 ++++++- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java index f03944be..3c33324b 100644 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java @@ -162,21 +162,21 @@ private Properties getProperties() { Topics moquetteTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, "moquette"); String password = brokerKeyStore.getJksPassword(); - p.setProperty(BrokerConstants.HOST_PROPERTY_NAME, - Coerce.toString(moquetteTopics.lookup(BrokerConstants.HOST_PROPERTY_NAME).dflt(BrokerConstants.HOST))); + p.setProperty(IConfig.HOST_PROPERTY_NAME, + Coerce.toString(moquetteTopics.lookup(IConfig.HOST_PROPERTY_NAME).dflt(BrokerConstants.HOST))); // Due to a deserialization bug in GSON during group deployments, this value can become a floating point // instead of an int. As a workaround, coerce to an int before converting back to a string - p.setProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, - String.valueOf(Coerce.toInt(moquetteTopics.lookup(BrokerConstants.SSL_PORT_PROPERTY_NAME).dflt("8883")))); - p.setProperty(BrokerConstants.JKS_PATH_PROPERTY_NAME, brokerKeyStore.getJksPath()); - p.setProperty(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, password); - p.setProperty(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, password); - p.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "false"); + p.setProperty(IConfig.SSL_PORT_PROPERTY_NAME, + String.valueOf(Coerce.toInt(moquetteTopics.lookup(IConfig.SSL_PORT_PROPERTY_NAME).dflt("8883")))); + p.setProperty(IConfig.JKS_PATH_PROPERTY_NAME, brokerKeyStore.getJksPath()); + p.setProperty(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, password); + p.setProperty(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, password); + p.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, "false"); p.setProperty(BrokerConstants.PEER_CERTIFICATE_AS_USERNAME, "true"); p.setProperty(BrokerConstants.NEED_CLIENT_AUTH, "true"); - p.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, "true"); - p.setProperty(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, String.valueOf(Coerce - .toInt(moquetteTopics.lookup(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME).dflt("131072")))); // 128KiB + p.setProperty(IConfig.BUFFER_FLUSH_MS_PROPERTY_NAME, "0"); + p.setProperty(IConfig.NETTY_MAX_BYTES_PROPERTY_NAME, String.valueOf(Coerce + .toInt(moquetteTopics.lookup(IConfig.NETTY_MAX_BYTES_PROPERTY_NAME).dflt("131072")))); // 128KiB p.setProperty(BrokerConstants.NETTY_ENABLED_TLS_PROTOCOLS_PROPERTY_NAME, "TLSv1.2"); p.setProperty(BrokerConstants.NETTY_CHANNEL_WRITE_LIMIT_PROPERTY_NAME, Coerce.toString( rootConfig.lookup(BrokerConstants.NETTY_CHANNEL_WRITE_LIMIT_PROPERTY_NAME) @@ -186,10 +186,12 @@ private Properties getProperties() { .dflt(BrokerConstants.DEFAULT_NETTY_CHANNEL_READ_LIMIT_BYTES))); //Disable plain TCP port - p.setProperty(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND); + p.setProperty(IConfig.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND); // Telemetry is actually deleted from the code base, but just set the flag here to be sure. - p.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); + p.setProperty(IConfig.ENABLE_TELEMETRY_NAME, "false"); + + p.setProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, "false"); return p; } diff --git a/moquette-0.17/broker/src/main/java/io/moquette/broker/PostOffice.java b/moquette-0.17/broker/src/main/java/io/moquette/broker/PostOffice.java index fa2631f2..866b2d5a 100644 --- a/moquette-0.17/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/moquette-0.17/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -469,7 +469,15 @@ private RoutingResults publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS collector.add(sub); } } - payload.retain(collector.countBatches()); + + int subscriptionCount = collector.countBatches(); + if (subscriptionCount <= 0) { + // no matching subscriptions, clean exit + LOG.trace("No matching subscriptions for topic: {}", topic); + return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null)); + } + + payload.retain(subscriptionCount); List publishResults = collector.routeBatchedPublishes((batch) -> { publishToSession(payload, topic, batch, publishingQos);