Skip to content

Commit

Permalink
Implement support for messages priority and using real-time priority …
Browse files Browse the repository at this point in the history
…for keep-alive messages.
  • Loading branch information
dchapyshev committed Jul 7, 2024
1 parent 7484dda commit 9f3e9d1
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 24 deletions.
15 changes: 8 additions & 7 deletions source/base/net/tcp_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ void TcpChannel::resume()
}

//--------------------------------------------------------------------------------------------------
void TcpChannel::send(uint8_t channel_id, ByteArray&& buffer)
void TcpChannel::send(uint8_t channel_id, ByteArray&& buffer, WriteTask::Priority priority)
{
addWriteTask(WriteTask::Type::USER_DATA, channel_id, std::move(buffer));
addWriteTask(WriteTask::Type::USER_DATA, priority, channel_id, std::move(buffer));
}

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -626,12 +626,13 @@ void TcpChannel::onMessageReceived()
}

//--------------------------------------------------------------------------------------------------
void TcpChannel::addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArray&& data)
void TcpChannel::addWriteTask(
WriteTask::Type type, WriteTask::Priority priority, uint8_t channel_id, ByteArray&& data)
{
const bool schedule_write = write_queue_.empty();

// Add the buffer to the queue for sending.
write_queue_.emplace(type, channel_id, std::move(data));
write_queue_.emplace(type, priority, next_sequence_num_++, channel_id, std::move(data));

if (schedule_write)
doWrite();
Expand All @@ -640,7 +641,7 @@ void TcpChannel::addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArra
//--------------------------------------------------------------------------------------------------
void TcpChannel::doWrite()
{
const WriteTask& task = write_queue_.front();
const WriteTask& task = write_queue_.top();
const ByteArray& source_buffer = task.data();
const uint8_t channel_id = task.channelId();

Expand Down Expand Up @@ -723,7 +724,7 @@ void TcpChannel::onWrite(const std::error_code& error_code, size_t bytes_transfe
// Update TX statistics.
addTxBytes(bytes_transferred);

const WriteTask& task = write_queue_.front();
const WriteTask& task = write_queue_.top();
WriteTask::Type task_type = task.type();
uint8_t channel_id = task.channelId();
ByteArray buffer = std::move(task.data());
Expand Down Expand Up @@ -1072,7 +1073,7 @@ void TcpChannel::sendKeepAlive(uint8_t flags, const void* data, size_t size)
memcpy(buffer.data() + sizeof(uint8_t) + sizeof(header), data, size);

// Add a task to the queue.
addWriteTask(WriteTask::Type::SERVICE_DATA, 0, std::move(buffer));
addWriteTask(WriteTask::Type::SERVICE_DATA, WriteTask::Priority::REAL_TIME, 0, std::move(buffer));
}

} // namespace base
12 changes: 5 additions & 7 deletions source/base/net/tcp_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
#include <asio/ip/tcp.hpp>
#include <asio/high_resolution_timer.hpp>

#include <queue>

namespace base {

class TcpChannelProxy;
Expand Down Expand Up @@ -90,9 +88,8 @@ class TcpChannel final : public NetworkChannel
// After calling the method, reading new messages will continue.
void resume();

// Sending a message. The method call is thread safe. After the call, the message will be added
// to the queue to be sent.
void send(uint8_t channel_id, ByteArray&& buffer);
// Sending a message. After the call, the message will be added to the queue to be sent.
void send(uint8_t channel_id, ByteArray&& buffer, WriteTask::Priority priority = WriteTask::Priority::NORMAL);

// Disable or enable the algorithm of Nagle.
bool setNoDelay(bool enable);
Expand Down Expand Up @@ -173,7 +170,7 @@ class TcpChannel final : public NetworkChannel
void onMessageWritten(uint8_t channel_id, ByteArray&& buffer);
void onMessageReceived();

void addWriteTask(WriteTask::Type type, uint8_t channel_id, ByteArray&& data);
void addWriteTask(WriteTask::Type type, WriteTask::Priority priority, uint8_t channel_id, ByteArray&& data);

void doWrite();
void onWrite(const std::error_code& error_code, size_t bytes_transferred);
Expand Down Expand Up @@ -212,7 +209,8 @@ class TcpChannel final : public NetworkChannel
std::unique_ptr<MessageEncryptor> encryptor_;
std::unique_ptr<MessageDecryptor> decryptor_;

std::queue<WriteTask> write_queue_;
int next_sequence_num_ = 0;
WriteQueue write_queue_;
VariableSizeWriter variable_size_writer_;
ByteArray write_buffer_;

Expand Down
7 changes: 4 additions & 3 deletions source/base/net/tcp_channel_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ TcpChannelProxy::TcpChannelProxy(
}

//--------------------------------------------------------------------------------------------------
void TcpChannelProxy::send(uint8_t channel_id, ByteArray&& buffer)
void TcpChannelProxy::send(uint8_t channel_id, ByteArray&& buffer, WriteTask::Priority priority)
{
bool schedule_write;

{
std::scoped_lock lock(incoming_queue_lock_);

schedule_write = incoming_queue_.empty();
incoming_queue_.emplace(WriteTask::Type::USER_DATA, channel_id, std::move(buffer));
incoming_queue_.emplace(
WriteTask::Type::USER_DATA, priority, next_sequence_num_++, channel_id, std::move(buffer));
}

if (!schedule_write)
Expand Down Expand Up @@ -71,7 +72,7 @@ void TcpChannelProxy::scheduleWrite()
}

//--------------------------------------------------------------------------------------------------
bool TcpChannelProxy::reloadWriteQueue(std::queue<WriteTask>* work_queue)
bool TcpChannelProxy::reloadWriteQueue(WriteQueue* work_queue)
{
if (!work_queue->empty())
return false;
Expand Down
10 changes: 6 additions & 4 deletions source/base/net/tcp_channel_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include "base/net/tcp_channel.h"

#include <shared_mutex>
#include <mutex>

namespace base {

Expand All @@ -30,7 +30,8 @@ class TaskRunner;
class TcpChannelProxy : public std::enable_shared_from_this<TcpChannelProxy>
{
public:
void send(uint8_t channel_id, ByteArray&& buffer);
void send(uint8_t channel_id, ByteArray&& buffer,
WriteTask::Priority priority = WriteTask::Priority::NORMAL);

private:
friend class TcpChannel;
Expand All @@ -40,13 +41,14 @@ class TcpChannelProxy : public std::enable_shared_from_this<TcpChannelProxy>
void willDestroyCurrentChannel();

void scheduleWrite();
bool reloadWriteQueue(std::queue<WriteTask>* work_queue);
bool reloadWriteQueue(WriteQueue* work_queue);

std::shared_ptr<TaskRunner> task_runner_;

TcpChannel* channel_;

std::queue<WriteTask> incoming_queue_;
int next_sequence_num_ = 0;
WriteQueue incoming_queue_;
std::mutex incoming_queue_lock_;

DISALLOW_COPY_AND_ASSIGN(TcpChannelProxy);
Expand Down
44 changes: 41 additions & 3 deletions source/base/net/write_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,70 @@

#include "base/memory/byte_array.h"

#include <queue>
#include <vector>

namespace base {

class WriteTask
{
public:
enum class Type { SERVICE_DATA, USER_DATA };

WriteTask(Type type, uint8_t channel_id, ByteArray&& data)
enum class Priority : int
{
REAL_TIME = 0,
HIGH = 1,
NORMAL = 2,
LOW = 3,
IDLE = 4
};

WriteTask(Type type, Priority priority, int sequence_num, uint8_t channel_id, ByteArray&& data)
: type_(type),
priority_(priority),
sequence_num_(sequence_num),
channel_id_(channel_id),
data_(std::move(data))
{
// Nothing
}

WriteTask(const WriteTask& other) = default;
WriteTask& operator=(const WriteTask& other) = default;

Type type() const { return type_; }
Priority priority() const { return priority_; }
int sequenceNum() const { return sequence_num_; }
uint8_t channelId() const { return channel_id_; }
const ByteArray& data() const { return data_; }
ByteArray& data() { return data_; }

private:
const Type type_;
const uint8_t channel_id_;
Type type_;
Priority priority_;
int sequence_num_;
uint8_t channel_id_;
ByteArray data_;
};

struct WriteTaskCompare
{
// Used to support sorting.
bool operator()(const WriteTask& first, const WriteTask& second)
{
if (first.priority() < second.priority())
return false;

if (first.priority() > second.priority())
return true;

return (first.sequenceNum() - second.sequenceNum()) > 0;
}
};

using WriteQueue = std::priority_queue<WriteTask, std::vector<WriteTask>, WriteTaskCompare>;

} // namespace base

#endif // BASE_NET_WRITE_TASK_H

0 comments on commit 9f3e9d1

Please sign in to comment.