From 9f3e9d16c6acec01eab8d5a801b9f0ff14457672 Mon Sep 17 00:00:00 2001 From: Dmitry Chapyshev Date: Sun, 7 Jul 2024 17:42:17 +0200 Subject: [PATCH] Implement support for messages priority and using real-time priority for keep-alive messages. --- source/base/net/tcp_channel.cc | 15 +++++----- source/base/net/tcp_channel.h | 12 ++++---- source/base/net/tcp_channel_proxy.cc | 7 +++-- source/base/net/tcp_channel_proxy.h | 10 ++++--- source/base/net/write_task.h | 44 ++++++++++++++++++++++++++-- 5 files changed, 64 insertions(+), 24 deletions(-) diff --git a/source/base/net/tcp_channel.cc b/source/base/net/tcp_channel.cc index 703e09380..90c8560f5 100644 --- a/source/base/net/tcp_channel.cc +++ b/source/base/net/tcp_channel.cc @@ -335,9 +335,9 @@ void TcpChannel::resume() } //-------------------------------------------------------------------------------------------------- -void TcpChannel::send(uint8_t channel_id, ByteArray&& buffer) +void TcpChannel::send(uint8_t channel_id, ByteArray&& buffer, WriteTask::Priority priority) { - addWriteTask(WriteTask::Type::USER_DATA, channel_id, std::move(buffer)); + addWriteTask(WriteTask::Type::USER_DATA, priority, channel_id, std::move(buffer)); } //-------------------------------------------------------------------------------------------------- @@ -626,12 +626,13 @@ void TcpChannel::onMessageReceived() } //-------------------------------------------------------------------------------------------------- -void TcpChannel::addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArray&& data) +void TcpChannel::addWriteTask( + WriteTask::Type type, WriteTask::Priority priority, uint8_t channel_id, ByteArray&& data) { const bool schedule_write = write_queue_.empty(); // Add the buffer to the queue for sending. - write_queue_.emplace(type, channel_id, std::move(data)); + write_queue_.emplace(type, priority, next_sequence_num_++, channel_id, std::move(data)); if (schedule_write) doWrite(); @@ -640,7 +641,7 @@ void TcpChannel::addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArra //-------------------------------------------------------------------------------------------------- void TcpChannel::doWrite() { - const WriteTask& task = write_queue_.front(); + const WriteTask& task = write_queue_.top(); const ByteArray& source_buffer = task.data(); const uint8_t channel_id = task.channelId(); @@ -723,7 +724,7 @@ void TcpChannel::onWrite(const std::error_code& error_code, size_t bytes_transfe // Update TX statistics. addTxBytes(bytes_transferred); - const WriteTask& task = write_queue_.front(); + const WriteTask& task = write_queue_.top(); WriteTask::Type task_type = task.type(); uint8_t channel_id = task.channelId(); ByteArray buffer = std::move(task.data()); @@ -1072,7 +1073,7 @@ void TcpChannel::sendKeepAlive(uint8_t flags, const void* data, size_t size) memcpy(buffer.data() + sizeof(uint8_t) + sizeof(header), data, size); // Add a task to the queue. - addWriteTask(WriteTask::Type::SERVICE_DATA, 0, std::move(buffer)); + addWriteTask(WriteTask::Type::SERVICE_DATA, WriteTask::Priority::REAL_TIME, 0, std::move(buffer)); } } // namespace base diff --git a/source/base/net/tcp_channel.h b/source/base/net/tcp_channel.h index 0cff98d0a..bd14a193f 100644 --- a/source/base/net/tcp_channel.h +++ b/source/base/net/tcp_channel.h @@ -29,8 +29,6 @@ #include #include -#include - namespace base { class TcpChannelProxy; @@ -90,9 +88,8 @@ class TcpChannel final : public NetworkChannel // After calling the method, reading new messages will continue. void resume(); - // Sending a message. The method call is thread safe. After the call, the message will be added - // to the queue to be sent. - void send(uint8_t channel_id, ByteArray&& buffer); + // Sending a message. After the call, the message will be added to the queue to be sent. + void send(uint8_t channel_id, ByteArray&& buffer, WriteTask::Priority priority = WriteTask::Priority::NORMAL); // Disable or enable the algorithm of Nagle. bool setNoDelay(bool enable); @@ -173,7 +170,7 @@ class TcpChannel final : public NetworkChannel void onMessageWritten(uint8_t channel_id, ByteArray&& buffer); void onMessageReceived(); - void addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArray&& data); + void addWriteTask(WriteTask::Type type, WriteTask::Priority priority, uint8_t channel_id, ByteArray&& data); void doWrite(); void onWrite(const std::error_code& error_code, size_t bytes_transferred); @@ -212,7 +209,8 @@ class TcpChannel final : public NetworkChannel std::unique_ptr encryptor_; std::unique_ptr decryptor_; - std::queue write_queue_; + int next_sequence_num_ = 0; + WriteQueue write_queue_; VariableSizeWriter variable_size_writer_; ByteArray write_buffer_; diff --git a/source/base/net/tcp_channel_proxy.cc b/source/base/net/tcp_channel_proxy.cc index 49ec25bfa..2afbdc403 100644 --- a/source/base/net/tcp_channel_proxy.cc +++ b/source/base/net/tcp_channel_proxy.cc @@ -35,7 +35,7 @@ TcpChannelProxy::TcpChannelProxy( } //-------------------------------------------------------------------------------------------------- -void TcpChannelProxy::send(uint8_t channel_id, ByteArray&& buffer) +void TcpChannelProxy::send(uint8_t channel_id, ByteArray&& buffer, WriteTask::Priority priority) { bool schedule_write; @@ -43,7 +43,8 @@ void TcpChannelProxy::send(uint8_t channel_id, ByteArray&& buffer) std::scoped_lock lock(incoming_queue_lock_); schedule_write = incoming_queue_.empty(); - incoming_queue_.emplace(WriteTask::Type::USER_DATA, channel_id, std::move(buffer)); + incoming_queue_.emplace( + WriteTask::Type::USER_DATA, priority, next_sequence_num_++, channel_id, std::move(buffer)); } if (!schedule_write) @@ -71,7 +72,7 @@ void TcpChannelProxy::scheduleWrite() } //-------------------------------------------------------------------------------------------------- -bool TcpChannelProxy::reloadWriteQueue(std::queue* work_queue) +bool TcpChannelProxy::reloadWriteQueue(WriteQueue* work_queue) { if (!work_queue->empty()) return false; diff --git a/source/base/net/tcp_channel_proxy.h b/source/base/net/tcp_channel_proxy.h index cf78e77a5..a4855f5aa 100644 --- a/source/base/net/tcp_channel_proxy.h +++ b/source/base/net/tcp_channel_proxy.h @@ -21,7 +21,7 @@ #include "base/net/tcp_channel.h" -#include +#include namespace base { @@ -30,7 +30,8 @@ class TaskRunner; class TcpChannelProxy : public std::enable_shared_from_this { public: - void send(uint8_t channel_id, ByteArray&& buffer); + void send(uint8_t channel_id, ByteArray&& buffer, + WriteTask::Priority priority = WriteTask::Priority::NORMAL); private: friend class TcpChannel; @@ -40,13 +41,14 @@ class TcpChannelProxy : public std::enable_shared_from_this void willDestroyCurrentChannel(); void scheduleWrite(); - bool reloadWriteQueue(std::queue* work_queue); + bool reloadWriteQueue(WriteQueue* work_queue); std::shared_ptr task_runner_; TcpChannel* channel_; - std::queue incoming_queue_; + int next_sequence_num_ = 0; + WriteQueue incoming_queue_; std::mutex incoming_queue_lock_; DISALLOW_COPY_AND_ASSIGN(TcpChannelProxy); diff --git a/source/base/net/write_task.h b/source/base/net/write_task.h index d759aeb99..0bb4699b9 100644 --- a/source/base/net/write_task.h +++ b/source/base/net/write_task.h @@ -21,6 +21,9 @@ #include "base/memory/byte_array.h" +#include +#include + namespace base { class WriteTask @@ -28,25 +31,60 @@ class WriteTask public: enum class Type { SERVICE_DATA, USER_DATA }; - WriteTask(Type type, uint8_t channel_id, ByteArray&& data) + enum class Priority : int + { + REAL_TIME = 0, + HIGH = 1, + NORMAL = 2, + LOW = 3, + IDLE = 4 + }; + + WriteTask(Type type, Priority priority, int sequence_num, uint8_t channel_id, ByteArray&& data) : type_(type), + priority_(priority), + sequence_num_(sequence_num), channel_id_(channel_id), data_(std::move(data)) { // Nothing } + WriteTask(const WriteTask& other) = default; + WriteTask& operator=(const WriteTask& other) = default; + Type type() const { return type_; } + Priority priority() const { return priority_; } + int sequenceNum() const { return sequence_num_; } uint8_t channelId() const { return channel_id_; } const ByteArray& data() const { return data_; } ByteArray& data() { return data_; } private: - const Type type_; - const uint8_t channel_id_; + Type type_; + Priority priority_; + int sequence_num_; + uint8_t channel_id_; ByteArray data_; }; +struct WriteTaskCompare +{ + // Used to support sorting. + bool operator()(const WriteTask& first, const WriteTask& second) + { + if (first.priority() < second.priority()) + return false; + + if (first.priority() > second.priority()) + return true; + + return (first.sequenceNum() - second.sequenceNum()) > 0; + } +}; + +using WriteQueue = std::priority_queue, WriteTaskCompare>; + } // namespace base #endif // BASE_NET_WRITE_TASK_H