Skip to content

Commit

Permalink
chore(chunkserver): Apply pending readability refactor
Browse files Browse the repository at this point in the history
This commit applies old PR suggestions to improve the readability of
the ChunkserverEntry struct and the NetworkWorkerThread class:

- Replace old magic constants by using named constants.
- Give more context about selected constants.

Signed-off-by: guillex <guillex@leil.io>
  • Loading branch information
lgsilva3087 committed Jan 21, 2025
1 parent c1f6c26 commit 8e70b17
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
45 changes: 23 additions & 22 deletions src/chunkserver/chunkserver_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ ChunkserverEntry::createDetachedPacketWithOutputBuffer(
getReadOutputBufferPool().put(std::move(outPacket->outputBuffer));
}

return nullptr;
return kInvalidPacket;
}

return outPacket;
Expand Down Expand Up @@ -236,22 +236,22 @@ int ChunkserverEntry::initConnection() {
fwdSocket = tcpsocket();
if (fwdSocket < 0) {
safs_pretty_errlog(LOG_WARNING, "create socket, error");
return -1;
return kInitConnectionFailed;
}

if (tcpnonblock(fwdSocket) < 0) {
safs_pretty_errlog(LOG_WARNING, "set nonblock, error");
tcpclose(fwdSocket);
fwdSocket = -1;
return -1;
fwdSocket = kInvalidSocket;
return kInitConnectionFailed;
}

status = tcpnumconnect(fwdSocket, fwdServer.ip, fwdServer.port);
if (status < 0) {
safs_pretty_errlog(LOG_WARNING, "connect failed, error");
tcpclose(fwdSocket);
fwdSocket = -1;
return -1;
fwdSocket = kInvalidSocket;
return kInitConnectionFailed;
}

if (status == 0) { // connected immediately
Expand All @@ -262,17 +262,17 @@ int ChunkserverEntry::initConnection() {
connectStartTimeUSec = eventloop_utime();
}

return 0;
return kInitConnectionOK;
}

void ChunkserverEntry::retryConnect() {
TRACETHIS();
tcpclose(fwdSocket);
fwdSocket = -1;
fwdSocket = kInvalidSocket;
connectRetryCounter++;

if (connectRetryCounter < kConnectRetries) {
if (initConnection() < 0) {
if (initConnection() < kInitConnectionOK) {
fwdError();
return;
}
Expand Down Expand Up @@ -373,7 +373,7 @@ void ChunkserverEntry::readContinue() {
messageSerializer->serializePrefixOfCstoclReadData(
readDataPrefix, chunkId, offset, thisPartSize);
auto packet = createDetachedPacketWithOutputBuffer(readDataPrefix);
if (packet == nullptr) {
if (packet == kInvalidPacket) {
state = State::Close;
return;
}
Expand Down Expand Up @@ -408,7 +408,9 @@ void ChunkserverEntry::readContinue() {
}

void ChunkserverEntry::ping(const uint8_t *data, PacketHeader::Length length) {
if (length != 4) {
static constexpr uint32_t kExpectedPingSize = sizeof(uint32_t);

if (length != kExpectedPingSize) {
state = State::Close;
return;
}
Expand Down Expand Up @@ -636,7 +638,7 @@ void ChunkserverEntry::writeInit(const uint8_t *data, PacketHeader::Type type,
fwdStartPtr = fwdInitPacket.data();
fwdBytesLeft = fwdInitPacket.size();
connectRetryCounter = 0;
if (initConnection() < 0) {
if (initConnection() < kInitConnectionOK) {
std::vector<uint8_t> buffer;
messageSerializer->serializeCstoclWriteStatus(
buffer, chunkId, 0, SAUNAFS_ERROR_CANTCONNECT);
Expand Down Expand Up @@ -829,7 +831,7 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
// TODO(msulikowski) if we want to use a ConnectionPool, this the right
// place to put the connection to the pool.
tcpclose(fwdSocket);
fwdSocket = -1;
fwdSocket = kInvalidSocket;
}
inputPacket.useAlignedMemory = false;
state = State::Idle;
Expand Down Expand Up @@ -916,7 +918,7 @@ void ChunkserverEntry::hddListV2([[maybe_unused]] const uint8_t *data,
uint32_t opSize;
uint8_t *ptr;

if (length != 0) {
if (length != 0) { // This packet should not have any data
safs_pretty_syslog(LOG_NOTICE,
"CLTOCS_HDD_LIST_V2 - wrong size (%" PRIu32 "/0)",
length);
Expand Down Expand Up @@ -950,9 +952,9 @@ void ChunkserverEntry::generateChartPNGorCSV(const uint8_t *data,
uint8_t *ptr;
uint32_t len;

if (length != 4) {
safs_pretty_syslog(LOG_NOTICE,
"CLTOAN_CHART - wrong size (%" PRIu32 "/4)", length);
if (length != kGenerateChartExpectedPacketSize) {
safs::log_info("CLTOAN_CHART - wrong size ({}/{})", length,
kGenerateChartExpectedPacketSize);
state = State::Close;
return;
}
Expand All @@ -978,10 +980,9 @@ void ChunkserverEntry::generateChartData(const uint8_t *data, uint32_t length) {
uint8_t *ptr;
uint32_t len;

if (length != 4) {
safs_pretty_syslog(LOG_NOTICE,
"CLTOAN_CHART_DATA - wrong size (%" PRIu32 "/4)",
length);
if (length != kGenerateChartExpectedPacketSize) {
safs::log_info("CLTOAN_CHART_DATA - wrong size ({}/{})", length,
kGenerateChartExpectedPacketSize);
state = State::Close;
return;
}
Expand Down Expand Up @@ -1253,7 +1254,7 @@ void ChunkserverEntry::fwdRead() {
if (fwdInputPacket.bytesLeft > 0) {
return;
}
ptr = fwdHeaderBuffer + 4;
ptr = fwdHeaderBuffer + sizeof(PacketHeader::Type); // skip type
opSize = get32bit(&ptr);
if (opSize > kMaxPacketSize) {
safs_pretty_syslog(LOG_WARNING,
Expand Down
12 changes: 11 additions & 1 deletion src/chunkserver/chunkserver_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ constexpr uint32_t kIOAlignedPacketSize = disk::kIoBlockSize + SFSBLOCKSIZE;
constexpr uint32_t kIOAlignedOffset =
disk::kIoBlockSize - cltocs::writeData::kPrefixSize;

// Alias for better readability
#define kInvalidPacket nullptr

/**
* @brief Encapsulates the data associated with a packet.
*
Expand Down Expand Up @@ -109,14 +112,21 @@ struct ChunkserverEntry {
Closed // ready to be deleted
};

// Some constants to improve readability
static constexpr int kInvalidSocket = -1;
static constexpr int kInitConnectionOK = 0;
static constexpr int kInitConnectionFailed = -1;
static constexpr uint32_t kGenerateChartExpectedPacketSize =
sizeof(uint32_t);

void* workerJobPool; // Job pool assigned to a given network worker thread

ChunkserverEntry::State state = ChunkserverEntry::State::Idle;
ChunkserverEntry::Mode mode = ChunkserverEntry::Mode::Header;
ChunkserverEntry::Mode fwdMode = ChunkserverEntry::Mode::Header;

int sock;
int fwdSocket = -1; ///< forwarding socket for writing
int fwdSocket = kInvalidSocket; ///< forwarding socket for writing
uint64_t connectStartTimeUSec = 0; ///< for timeout and retry (usec)
uint8_t connectRetryCounter = 0; ///< for timeout and retry
NetworkAddress fwdServer; // the next server in write chain
Expand Down
5 changes: 4 additions & 1 deletion src/chunkserver/network_worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ NetworkWorkerThread::NetworkWorkerThread(uint32_t nrOfBgjobsWorkers,
TRACETHIS();
eassert(pipe(notify_pipe) != -1);
#ifdef F_SETPIPE_SZ
eassert(fcntl(notify_pipe[1], F_SETPIPE_SZ, 4096 * 32));
// Increase the pipe size to 128 KiB to handle a larger number of jobs
// without backpressure. On modern linux, the default pipe size is 64 KiB.
static constexpr int kPageAlignedPipeSize = 4096 * 32;
eassert(fcntl(notify_pipe[1], F_SETPIPE_SZ, kPageAlignedPipeSize));
#endif
bgJobPool_ =
job_pool_new(nrOfBgjobsWorkers, bgjobsCount, &bgJobPoolWakeUpFd_);
Expand Down

0 comments on commit 8e70b17

Please sign in to comment.