Skip to content

Commit

Permalink
fix: check for 0 subscription batches (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Sep 14, 2023
1 parent fdb7b0f commit 10d8671
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,21 @@ private Properties getProperties() {
Topics moquetteTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, "moquette");

String password = brokerKeyStore.getJksPassword();
p.setProperty(BrokerConstants.HOST_PROPERTY_NAME,
Coerce.toString(moquetteTopics.lookup(BrokerConstants.HOST_PROPERTY_NAME).dflt(BrokerConstants.HOST)));
p.setProperty(IConfig.HOST_PROPERTY_NAME,
Coerce.toString(moquetteTopics.lookup(IConfig.HOST_PROPERTY_NAME).dflt(BrokerConstants.HOST)));
// Due to a deserialization bug in GSON during group deployments, this value can become a floating point
// instead of an int. As a workaround, coerce to an int before converting back to a string
p.setProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME,
String.valueOf(Coerce.toInt(moquetteTopics.lookup(BrokerConstants.SSL_PORT_PROPERTY_NAME).dflt("8883"))));
p.setProperty(BrokerConstants.JKS_PATH_PROPERTY_NAME, brokerKeyStore.getJksPath());
p.setProperty(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, password);
p.setProperty(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, password);
p.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "false");
p.setProperty(IConfig.SSL_PORT_PROPERTY_NAME,
String.valueOf(Coerce.toInt(moquetteTopics.lookup(IConfig.SSL_PORT_PROPERTY_NAME).dflt("8883"))));
p.setProperty(IConfig.JKS_PATH_PROPERTY_NAME, brokerKeyStore.getJksPath());
p.setProperty(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, password);
p.setProperty(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, password);
p.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, "false");
p.setProperty(BrokerConstants.PEER_CERTIFICATE_AS_USERNAME, "true");
p.setProperty(BrokerConstants.NEED_CLIENT_AUTH, "true");
p.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, "true");
p.setProperty(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, String.valueOf(Coerce
.toInt(moquetteTopics.lookup(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME).dflt("131072")))); // 128KiB
p.setProperty(IConfig.BUFFER_FLUSH_MS_PROPERTY_NAME, "0");
p.setProperty(IConfig.NETTY_MAX_BYTES_PROPERTY_NAME, String.valueOf(Coerce
.toInt(moquetteTopics.lookup(IConfig.NETTY_MAX_BYTES_PROPERTY_NAME).dflt("131072")))); // 128KiB
p.setProperty(BrokerConstants.NETTY_ENABLED_TLS_PROTOCOLS_PROPERTY_NAME, "TLSv1.2");
p.setProperty(BrokerConstants.NETTY_CHANNEL_WRITE_LIMIT_PROPERTY_NAME, Coerce.toString(
rootConfig.lookup(BrokerConstants.NETTY_CHANNEL_WRITE_LIMIT_PROPERTY_NAME)
Expand All @@ -186,10 +186,12 @@ private Properties getProperties() {
.dflt(BrokerConstants.DEFAULT_NETTY_CHANNEL_READ_LIMIT_BYTES)));

//Disable plain TCP port
p.setProperty(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
p.setProperty(IConfig.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);

// Telemetry is actually deleted from the code base, but just set the flag here to be sure.
p.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
p.setProperty(IConfig.ENABLE_TELEMETRY_NAME, "false");

p.setProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, "false");

return p;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,15 @@ private RoutingResults publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS
collector.add(sub);
}
}
payload.retain(collector.countBatches());

int subscriptionCount = collector.countBatches();
if (subscriptionCount <= 0) {
// no matching subscriptions, clean exit
LOG.trace("No matching subscriptions for topic: {}", topic);
return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
}

payload.retain(subscriptionCount);

List<RouteResult> publishResults = collector.routeBatchedPublishes((batch) -> {
publishToSession(payload, topic, batch, publishingQos);
Expand Down

0 comments on commit 10d8671

Please sign in to comment.