diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 71f17646405d2..d242ed34099e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1063,10 +1063,12 @@ public CompletableFuture unsubscribe(String subscriptionName) { // That creates deadlock. so, execute remove it in different thread. return CompletableFuture.runAsync(() -> { NonPersistentSubscription sub = subscriptions.remove(subscriptionName); - // preserve accumulative stats form removed subscription - SubscriptionStatsImpl stats = sub.getStats(); - bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter); - msgOutFromRemovedSubscriptions.add(stats.msgOutCounter); + if (sub != null) { + // preserve accumulative stats form removed subscription + SubscriptionStatsImpl stats = sub.getStats(); + bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter); + msgOutFromRemovedSubscriptions.add(stats.msgOutCounter); + } }, brokerService.executor()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 235ea527e693a..bc06ea44c31b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1050,10 +1050,12 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { void removeSubscription(String subscriptionName) { PersistentSubscription sub = subscriptions.remove(subscriptionName); - // preserve accumulative stats form removed subscription - SubscriptionStatsImpl stats = sub.getStats(false, false, false); - bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter); - msgOutFromRemovedSubscriptions.add(stats.msgOutCounter); + if (sub != null) { + // preserve accumulative stats form removed subscription + SubscriptionStatsImpl stats = sub.getStats(false, false, false); + bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter); + msgOutFromRemovedSubscriptions.add(stats.msgOutCounter); + } } /**