diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 2c09ce2..ef00891 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -271,7 +271,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr &client, const s subscriptionCount++; if (authResult == AuthResult::success && shareName.empty()) - giveClientRetainedMessages(ses, subtopics, qos); + giveClientRetainedMessages(ses, subtopics, qos, subscriptionIdentifier); } @@ -681,6 +681,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector &this_node, bool poundMode, const std::shared_ptr &session, const uint8_t max_qos, + const uint32_t subscription_identifier, const std::chrono::time_point &limit, std::deque &deferred, int &drop_count, int &processed_nodes_count) @@ -704,7 +705,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vectorwritePacket(copyFactory, max_qos, true, 0); // TODO: subscription identifier. From args? + const PacketDropReason drop_reason = session->writePacket(copyFactory, max_qos, true, subscription_identifier); if (drop_reason == PacketDropReason::BufferFull || drop_reason == PacketDropReason::QoSTODOSomethingSomething) { @@ -746,7 +747,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector &child = pair.second; - giveClientRetainedMessagesRecursively(cur_subtopic_it, end, child, poundMode, session, max_qos, limit, deferred, drop_count, ++processed_nodes_count); + giveClientRetainedMessagesRecursively( + cur_subtopic_it, end, child, poundMode, session, max_qos, subscription_identifier, limit, deferred, drop_count, ++processed_nodes_count); } } } @@ -760,7 +762,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector &child = pair.second; - giveClientRetainedMessagesRecursively(next_subtopic, end, child, false, session, max_qos, limit, deferred, drop_count, ++processed_nodes_count); + giveClientRetainedMessagesRecursively( + next_subtopic, end, child, false, session, max_qos, subscription_identifier, limit, deferred, drop_count, ++processed_nodes_count); } } } @@ -801,7 +805,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector ses, const std::shared_ptr> subscribeSubtopicsCopy, std::shared_ptr> deferred, - int &requeue_count, uint &total_node_count, uint8_t max_qos) + int &requeue_count, uint &total_node_count, uint8_t max_qos, + const uint32_t subscription_identifier) { std::shared_ptr session = ses.lock(); @@ -840,7 +846,7 @@ void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::we RWLockGuard locker(&retainedMessagesRwlock); locker.rdlock(); - giveClientRetainedMessagesRecursively(d.cur, d.end, node, d.poundMode, session, max_qos, new_limit, *deferred, drop_count, processed_nodes); + giveClientRetainedMessagesRecursively(d.cur, d.end, node, d.poundMode, session, max_qos, subscription_identifier, new_limit, *deferred, drop_count, processed_nodes); } total_node_count += processed_nodes; @@ -853,7 +859,8 @@ void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::we if (!deferred->empty() && ++requeue_count < 100 && total_node_count < settings->retainedMessagesNodeLimit) { ThreadData *t = ThreadGlobals::getThreadData(); - auto again = std::bind(&SubscriptionStore::giveClientRetainedMessagesInitiateDeferred, this, ses, subscribeSubtopicsCopy, deferred, requeue_count, total_node_count, max_qos); + auto again = std::bind(&SubscriptionStore::giveClientRetainedMessagesInitiateDeferred, this, + ses, subscribeSubtopicsCopy, deferred, requeue_count, total_node_count, max_qos, subscription_identifier); /* * Adding a delayed retry is kind of a cheap way to avoid detecting when there is buffer or QoS space in the event loop, but that comes @@ -869,7 +876,7 @@ void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::we } void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr &ses, - const std::vector &subscribeSubtopics, uint8_t max_qos) + const std::vector &subscribeSubtopics, uint8_t max_qos, const uint32_t subscriptionIdentifier) { if (!ses) return; @@ -904,7 +911,7 @@ void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr::const_iterator cur_subtopic_it, std::vector::const_iterator end, const std::shared_ptr &this_node, bool poundMode, const std::shared_ptr &session, const uint8_t max_qos, + const uint32_t subscription_identifier, const std::chrono::time_point &limit, std::deque &deferred, int &drop_count, int &processed_nodes_count); @@ -189,11 +190,12 @@ class SubscriptionStore void queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session); void queuePacketAtSubscribers(PublishCopyFactory ©Factory, const std::string &senderClientId, bool dollar = false); void giveClientRetainedMessages(const std::shared_ptr &ses, - const std::vector &subscribeSubtopics, uint8_t max_qos); + const std::vector &subscribeSubtopics, uint8_t max_qos, const uint32_t subscriptionIdentifier); void giveClientRetainedMessagesInitiateDeferred(const std::weak_ptr ses, const std::shared_ptr> subscribeSubtopicsCopy, std::shared_ptr> deferred, - int &requeue_count, uint &total_node_count, uint8_t max_qos); + int &requeue_count, uint &total_node_count, uint8_t max_qos, + const uint32_t subscription_identifier); void trySetRetainedMessages(const Publish &publish, const std::vector &subtopics); bool setRetainedMessage(const Publish &publish, const std::vector &subtopics, bool try_lock_fail=false);