Skip to content

Commit

Permalink
Merge pull request #29 from CESNET/appfs-fuse-loop-terminate
Browse files Browse the repository at this point in the history
appFs - non-blocking fuse loop
  • Loading branch information
SiskaPavel authored Nov 5, 2024
2 parents d59681a + 83cf610 commit b1609fa
Showing 1 changed file with 93 additions and 5 deletions.
98 changes: 93 additions & 5 deletions src/appFs/appFs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include <cstdlib>
#include <cstring>
#include <filesystem>
#include <fuse3/fuse_lowlevel.h>
#include <iostream>
#include <poll.h>
#include <telemetry.hpp>
#include <unistd.h>

Expand Down Expand Up @@ -316,13 +318,95 @@ static void setFuseOperations(struct fuse_operations* fuseOps)
fuseOps->release = releaseCallback;
}

static void runFuseLoop(struct fuse* fuse)
class AppFsFuseBuffer {
public:
AppFsFuseBuffer()
{
m_buffer.mem = nullptr;
m_buffer.size = 0;
}

~AppFsFuseBuffer()
{
// NOLINTNEXTLINE (cppcoreguidelines-no-malloc)
free(m_buffer.mem);
}

fuse_buf* getBuffer() { return &m_buffer; }

private:
fuse_buf m_buffer;
};

static void fuseLoop(struct fuse_session* session)
{
try {
const int ret = fuse_loop(fuse);
AppFsFuseBuffer buffer;

do {
const int ret = fuse_session_receive_buf(session, buffer.getBuffer());
if (ret == -EINTR) {
continue;
}
if (ret == -EAGAIN) {
return;
}
if (ret < 0) {
throw std::runtime_error("fuse_loop() is not running...");
throw std::runtime_error(
"fuse_session_receive_buf() has failed: " + std::to_string(ret));
}
fuse_session_process_buf(session, buffer.getBuffer());
return;
} while (true);
}

static void setupFuseSessionFd(struct fuse* fuse)
{
struct fuse_session* session = fuse_get_session(fuse);
const int sessionFd = fuse_session_fd(session);

int ret = fcntl(sessionFd, F_GETFL, 0);
if (ret < 0) {
throw std::runtime_error(
"failed to F_GETFL on fuse file-descriptor: " + std::to_string(ret));
}

ret = fcntl(sessionFd, F_SETFL, ret | O_NONBLOCK);
if (ret < 0) {
throw std::runtime_error(
"failed to F_SETFL on fuse file-descriptor: " + std::to_string(ret));
}
}

static void pollableFuseLoop(struct fuse* fuse)
{
struct fuse_session* session = fuse_get_session(fuse);

struct pollfd pfd;
pfd.fd = fuse_session_fd(session);
pfd.events = POLLIN;

while (fuse_session_exited(session) == 0) {
const int pollTimeout = 500;
const int pollResult = poll(&pfd, 1, pollTimeout);
if (pollResult == -1) {
// NOLINTNEXTLINE (concurrency-mt-unsafe, function is not thread safe)
throw std::runtime_error("poll failed: " + std::string(strerror(pollResult)));
}

if (pollResult == 0) {
continue;
}

if ((pfd.revents & POLLIN) != 0) {
fuseLoop(session);
}
}
}

static void tryCatchPollableFuseLoop(struct fuse* fuse)
{
try {
pollableFuseLoop(fuse);
} catch (const std::exception& ex) {
std::cerr << ex.what() << std::endl;
}
Expand Down Expand Up @@ -430,6 +514,8 @@ AppFsFuse::AppFsFuse(
if (ret < 0) {
throw std::runtime_error("fuse_mount() has failed.");
}

setupFuseSessionFd(m_fuse.get());
}

void AppFsFuse::start()
Expand All @@ -438,13 +524,15 @@ void AppFsFuse::start()
throw std::runtime_error("AppFsFuse::start() has already been called");
}

m_fuseThread = std::thread([&]() { runFuseLoop(m_fuse.get()); });
m_fuseThread = std::thread([&]() { tryCatchPollableFuseLoop(m_fuse.get()); });
m_isStarted = true;
}

void AppFsFuse::stop()
{
unmount();
fuse_exit(m_fuse.get());

if (m_fuseThread.joinable()) {
m_fuseThread.join();
}
Expand Down

0 comments on commit b1609fa

Please sign in to comment.