Skip to content

Commit

Permalink
WIP subscr identifiers in retained messages
Browse files Browse the repository at this point in the history
  • Loading branch information
halfgaar committed Nov 9, 2024
1 parent c4c7c77 commit 0fe3ace
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
29 changes: 18 additions & 11 deletions subscriptionstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ void SubscriptionStore::addSubscription(std::shared_ptr<Client> &client, const s
subscriptionCount++;

if (authResult == AuthResult::success && shareName.empty())
giveClientRetainedMessages(ses, subtopics, qos);
giveClientRetainedMessages(ses, subtopics, qos, subscriptionIdentifier);

}

Expand Down Expand Up @@ -681,6 +681,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
const std::shared_ptr<RetainedMessageNode> &this_node,
bool poundMode,
const std::shared_ptr<Session> &session, const uint8_t max_qos,
const uint32_t subscription_identifier,
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<DeferredRetainedMessageNodeDelivery> &deferred,
int &drop_count, int &processed_nodes_count)
Expand All @@ -704,7 +705,7 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
if (auth.aclCheck(publish, publish.payload) == AuthResult::success)
{
PublishCopyFactory copyFactory(&publish);
const PacketDropReason drop_reason = session->writePacket(copyFactory, max_qos, true, 0); // TODO: subscription identifier. From args?
const PacketDropReason drop_reason = session->writePacket(copyFactory, max_qos, true, subscription_identifier);

if (drop_reason == PacketDropReason::BufferFull || drop_reason == PacketDropReason::QoSTODOSomethingSomething)
{
Expand Down Expand Up @@ -746,7 +747,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
else
{
std::shared_ptr<RetainedMessageNode> &child = pair.second;
giveClientRetainedMessagesRecursively(cur_subtopic_it, end, child, poundMode, session, max_qos, limit, deferred, drop_count, ++processed_nodes_count);
giveClientRetainedMessagesRecursively(
cur_subtopic_it, end, child, poundMode, session, max_qos, subscription_identifier, limit, deferred, drop_count, ++processed_nodes_count);
}
}
}
Expand All @@ -760,7 +762,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
if (cur_subtop == "#")
{
// We start at this node, so that a subscription on 'one/two/three/#' gives you 'one/two/three' too.
giveClientRetainedMessagesRecursively(next_subtopic, end, this_node, true, session, max_qos, limit, deferred, drop_count, ++processed_nodes_count);
giveClientRetainedMessagesRecursively(
next_subtopic, end, this_node, true, session, max_qos, subscription_identifier, limit, deferred, drop_count, ++processed_nodes_count);
}
else if (cur_subtop == "+")
{
Expand All @@ -780,7 +783,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
else
{
std::shared_ptr<RetainedMessageNode> &child = pair.second;
giveClientRetainedMessagesRecursively(next_subtopic, end, child, false, session, max_qos, limit, deferred, drop_count, ++processed_nodes_count);
giveClientRetainedMessagesRecursively(
next_subtopic, end, child, false, session, max_qos, subscription_identifier, limit, deferred, drop_count, ++processed_nodes_count);
}
}
}
Expand All @@ -801,7 +805,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
}
else
{
giveClientRetainedMessagesRecursively(next_subtopic, end, children, false, session, max_qos, limit, deferred, drop_count, ++processed_nodes_count);
giveClientRetainedMessagesRecursively(
next_subtopic, end, children, false, session, max_qos, subscription_identifier, limit, deferred, drop_count, ++processed_nodes_count);
}
}
}
Expand All @@ -810,7 +815,8 @@ void SubscriptionStore::giveClientRetainedMessagesRecursively(std::vector<std::s
void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::weak_ptr<Session> ses,
const std::shared_ptr<const std::vector<std::string>> subscribeSubtopicsCopy,
std::shared_ptr<std::deque<DeferredRetainedMessageNodeDelivery>> deferred,
int &requeue_count, uint &total_node_count, uint8_t max_qos)
int &requeue_count, uint &total_node_count, uint8_t max_qos,
const uint32_t subscription_identifier)
{
std::shared_ptr<Session> session = ses.lock();

Expand Down Expand Up @@ -840,7 +846,7 @@ void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::we

RWLockGuard locker(&retainedMessagesRwlock);
locker.rdlock();
giveClientRetainedMessagesRecursively(d.cur, d.end, node, d.poundMode, session, max_qos, new_limit, *deferred, drop_count, processed_nodes);
giveClientRetainedMessagesRecursively(d.cur, d.end, node, d.poundMode, session, max_qos, subscription_identifier, new_limit, *deferred, drop_count, processed_nodes);
}

total_node_count += processed_nodes;
Expand All @@ -853,7 +859,8 @@ void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::we
if (!deferred->empty() && ++requeue_count < 100 && total_node_count < settings->retainedMessagesNodeLimit)
{
ThreadData *t = ThreadGlobals::getThreadData();
auto again = std::bind(&SubscriptionStore::giveClientRetainedMessagesInitiateDeferred, this, ses, subscribeSubtopicsCopy, deferred, requeue_count, total_node_count, max_qos);
auto again = std::bind(&SubscriptionStore::giveClientRetainedMessagesInitiateDeferred, this,
ses, subscribeSubtopicsCopy, deferred, requeue_count, total_node_count, max_qos, subscription_identifier);

/*
* Adding a delayed retry is kind of a cheap way to avoid detecting when there is buffer or QoS space in the event loop, but that comes
Expand All @@ -869,7 +876,7 @@ void SubscriptionStore::giveClientRetainedMessagesInitiateDeferred(const std::we
}

void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
const std::vector<std::string> &subscribeSubtopics, uint8_t max_qos)
const std::vector<std::string> &subscribeSubtopics, uint8_t max_qos, const uint32_t subscriptionIdentifier)
{
if (!ses)
return;
Expand Down Expand Up @@ -904,7 +911,7 @@ void SubscriptionStore::giveClientRetainedMessages(const std::shared_ptr<Session

int requeue_count = 0;
uint total_node_count = 0;
giveClientRetainedMessagesInitiateDeferred(ses, subscribeSubtopicsCopy, deferred, requeue_count, total_node_count, max_qos);
giveClientRetainedMessagesInitiateDeferred(ses, subscribeSubtopicsCopy, deferred, requeue_count, total_node_count, max_qos, subscriptionIdentifier);
}

/**
Expand Down
6 changes: 4 additions & 2 deletions subscriptionstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SubscriptionStore
static void giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it,
std::vector<std::string>::const_iterator end, const std::shared_ptr<RetainedMessageNode> &this_node, bool poundMode,
const std::shared_ptr<Session> &session, const uint8_t max_qos,
const uint32_t subscription_identifier,
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<DeferredRetainedMessageNodeDelivery> &deferred,
int &drop_count, int &processed_nodes_count);
Expand Down Expand Up @@ -189,11 +190,12 @@ class SubscriptionStore
void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session);
void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, const std::string &senderClientId, bool dollar = false);
void giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
const std::vector<std::string> &subscribeSubtopics, uint8_t max_qos);
const std::vector<std::string> &subscribeSubtopics, uint8_t max_qos, const uint32_t subscriptionIdentifier);
void giveClientRetainedMessagesInitiateDeferred(const std::weak_ptr<Session> ses,
const std::shared_ptr<const std::vector<std::string>> subscribeSubtopicsCopy,
std::shared_ptr<std::deque<DeferredRetainedMessageNodeDelivery>> deferred,
int &requeue_count, uint &total_node_count, uint8_t max_qos);
int &requeue_count, uint &total_node_count, uint8_t max_qos,
const uint32_t subscription_identifier);

void trySetRetainedMessages(const Publish &publish, const std::vector<std::string> &subtopics);
bool setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics, bool try_lock_fail=false);
Expand Down

0 comments on commit 0fe3ace

Please sign in to comment.