Skip to content

Commit

Permalink
Move experimental async io to folly/io/async
Browse files Browse the repository at this point in the history
Summary:
The following targets were moved to folly/io/async:
```
//folly/experimental/io:async_base
//folly/experimental/io:async_io
//folly/experimental/io:liburing
//folly/experimental/io:async_io_uring_socket
//folly/experimental/io:simple_async_io
//folly/experimental/io:epoll
//folly/experimental/io:epoll_backend
//folly/experimental/io:event_base_poller
//folly/experimental/io:mux_io_thread_pool_executor
//folly/experimental/io:io_uring
//folly/experimental/io:io_uring_backend
//folly/experimental/io:io_uring_provided_buffer_ring
//folly/experimental/io:io_uring_event
//folly/experimental/io:io_uring_event_base_local
//folly/experimental/io/test:async_io_test
//folly/experimental/io/test:async_io_uring_socket_test
//folly/experimental/io/test:epoll_backend_test
//folly/experimental/io/test:io_benchmark
//folly/experimental/io/test:io_uring_backend_bench
//folly/experimental/io/test:io_uring_backend_setup_test
//folly/experimental/io/test:io_uring_backend_test
//folly/experimental/io/test:io_uring_event_base_local_test
//folly/experimental/io/test:io_uring_event_test
//folly/experimental/io/test:io_uring_test
//folly/experimental/io/test:registered_fd_benchmark
//folly/experimental/io/test:simple_async_io_test
```

`arc f` was applied

This is a codemod. It was automatically generated and will be landed once it is approved and tests are passing in sandcastle.
You have been added as a reviewer by Sentinel or Butterfly.

p:meaio3.folly

Differential Revision: D59603836

fbshipit-source-id: 793e3a30ae5e27dfeda85d83ec7e11497cbb13d9
  • Loading branch information
Gownta authored and facebook-github-bot committed Jul 12, 2024
1 parent 1c8bc50 commit e5d54e8
Show file tree
Hide file tree
Showing 61 changed files with 3,871 additions and 3,454 deletions.
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1141,12 +1141,12 @@ if (BUILD_TESTS OR BUILD_BENCHMARKS)

if (${LIBAIO_FOUND})
folly_define_tests(
DIRECTORY experimental/io/test/
DIRECTORY io/async/test/
TEST async_io_test
SOURCES
AsyncIOTest.cpp
AsyncBaseTestLib.cpp
IoTestTempFileUtil.cpp
../../../experimental/io/test/AsyncBaseTestLib.cpp
../../../experimental/io/test/IoTestTempFileUtil.cpp
)
endif()

Expand Down
303 changes: 1 addition & 302 deletions folly/experimental/io/AsyncBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,305 +14,4 @@
* limitations under the License.
*/

#pragma once

#include <sys/types.h>

#include <atomic>
#include <cstdint>
#include <deque>
#include <functional>
#include <iosfwd>
#include <mutex>
#include <utility>
#include <vector>

#include <folly/Function.h>
#include <folly/Portability.h>
#include <folly/Range.h>
#include <folly/portability/SysUio.h>

namespace folly {
class AsyncIOOp;
class IoUringOp;
/**
* An AsyncBaseOp represents a pending operation. You may set a notification
* callback or you may use this class's methods directly.
*
* The op must remain allocated until it is completed or canceled.
*/
class AsyncBaseOp {
friend class AsyncBase;

public:
using NotificationCallback = folly::Function<void(AsyncBaseOp*)>;

explicit AsyncBaseOp(NotificationCallback cb = NotificationCallback());
AsyncBaseOp(const AsyncBaseOp&) = delete;
AsyncBaseOp& operator=(const AsyncBaseOp&) = delete;
virtual ~AsyncBaseOp();

enum class State {
UNINITIALIZED,
INITIALIZED,
PENDING,
COMPLETED,
CANCELED,
};

/**
* Initiate a read request.
*/
virtual void pread(int fd, void* buf, size_t size, off_t start) = 0;
void pread(int fd, Range<unsigned char*> range, off_t start) {
pread(fd, range.begin(), range.size(), start);
}
virtual void preadv(int fd, const iovec* iov, int iovcnt, off_t start) = 0;
virtual void pread(
int fd, void* buf, size_t size, off_t start, int /*buf_index*/) {
pread(fd, buf, size, start);
}

/**
* Initiate a write request.
*/
virtual void pwrite(int fd, const void* buf, size_t size, off_t start) = 0;
void pwrite(int fd, Range<const unsigned char*> range, off_t start) {
pwrite(fd, range.begin(), range.size(), start);
}
virtual void pwritev(int fd, const iovec* iov, int iovcnt, off_t start) = 0;
virtual void pwrite(
int fd, const void* buf, size_t size, off_t start, int /*buf_index*/) {
pwrite(fd, buf, size, start);
}

// we support only these subclasses
virtual AsyncIOOp* getAsyncIOOp() = 0;
virtual IoUringOp* getIoUringOp() = 0;

// ostream output
virtual void toStream(std::ostream& os) const = 0;

/**
* Return the current operation state.
*/
State state() const { return state_; }

/**
* user data get/set
*/
void* getUserData() const { return userData_; }

void setUserData(void* userData) { userData_ = userData; }

/**
* Reset the operation for reuse. It is an error to call reset() on
* an Op that is still pending.
*/
virtual void reset(NotificationCallback cb = NotificationCallback()) = 0;

void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }

/**
* Get the notification callback from the op.
*
* Note that this moves the callback out, leaving the callback in an
* uninitialized state! You must call setNotificationCallback before
* submitting the operation!
*/
NotificationCallback getNotificationCallback() { return std::move(cb_); }

/**
* Retrieve the result of this operation. Returns >=0 on success,
* -errno on failure (that is, using the Linux kernel error reporting
* conventions). Use checkKernelError (folly/Exception.h) on the result to
* throw a std::system_error in case of error instead.
*
* It is an error to call this if the Op hasn't completed.
*/
ssize_t result() const;

// debug helper
static std::string fd2name(int fd);

protected:
void init();
void start();
void unstart();
void complete(ssize_t result);
void cancel();

NotificationCallback cb_;
std::atomic<State> state_;
ssize_t result_;
void* userData_{nullptr};
};

std::ostream& operator<<(std::ostream& os, const AsyncBaseOp& op);
std::ostream& operator<<(std::ostream& os, AsyncBaseOp::State state);

/**
* Generic C++ interface around Linux IO(io_submit, io_uring)
*/
class AsyncBase {
public:
using Op = AsyncBaseOp;

enum PollMode {
NOT_POLLABLE,
POLLABLE,
};

/**
* Create an AsyncBase context capable of holding at most 'capacity' pending
* requests at the same time. As requests complete, others can be scheduled,
* as long as this limit is not exceeded.
*
* If pollMode is POLLABLE, pollFd() will return a file descriptor that
* can be passed to poll / epoll / select and will become readable when
* any IOs on this AsyncBase have completed. If you do this, you must use
* pollCompleted() instead of wait() -- do not read from the pollFd()
* file descriptor directly.
*
* You may use the same AsyncBase object from multiple threads, as long as
* there is only one concurrent caller of wait() / pollCompleted() / cancel()
* (perhaps by always calling it from the same thread, or by providing
* appropriate mutual exclusion). In this case, pending() returns a snapshot
* of the current number of pending requests.
*/
explicit AsyncBase(size_t capacity, PollMode pollMode = NOT_POLLABLE);
AsyncBase(const AsyncBase&) = delete;
AsyncBase& operator=(const AsyncBase&) = delete;
virtual ~AsyncBase();

/**
* Initialize context
*/
virtual void initializeContext() = 0;

/**
* Wait for at least minRequests to complete. Returns the requests that
* have completed; the returned range is valid until the next call to
* wait(). minRequests may be 0 to not block.
*/
Range<Op**> wait(size_t minRequests);

/**
* Cancel all pending requests and return them; the returned range is
* valid until the next call to cancel().
*/
Range<Op**> cancel();

/**
* Return the number of pending requests.
*/
size_t pending() const { return pending_; }

/**
* Return the maximum number of requests that can be kept outstanding
* at any one time.
*/
size_t capacity() const { return capacity_; }

/**
* Return the accumulative number of submitted I/O, since this object
* has been created.
*/
size_t totalSubmits() const { return submitted_; }

/**
* If POLLABLE, return a file descriptor that can be passed to poll / epoll
* and will become readable when any async IO operations have completed.
* If NOT_POLLABLE, return -1.
*/
int pollFd() const { return pollFd_; }

/**
* If POLLABLE, call instead of wait after the file descriptor returned
* by pollFd() became readable. The returned range is valid until the next
* call to pollCompleted().
*/
Range<Op**> pollCompleted();

/**
* Submit an op for execution.
*/
void submit(Op* op);

/**
* Submit a range of ops for execution
*/
int submit(Range<Op**> ops);

protected:
virtual int drainPollFd() = 0;
void complete(Op* op, ssize_t result) { op->complete(result); }

void cancel(Op* op) { op->cancel(); }

bool isInit() const { return init_.load(std::memory_order_relaxed); }

void decrementPending(size_t num = 1);
virtual int submitOne(AsyncBase::Op* op) = 0;
virtual int submitRange(Range<AsyncBase::Op**> ops) = 0;

enum class WaitType { COMPLETE, CANCEL };
virtual Range<AsyncBase::Op**> doWait(
WaitType type,
size_t minRequests,
size_t maxRequests,
std::vector<Op*>& result) = 0;

std::atomic<bool> init_{false};
std::mutex initMutex_;

std::atomic<size_t> pending_{0};
std::atomic<size_t> submitted_{0};
const size_t capacity_;
const PollMode pollMode_;
int pollFd_{-1};
std::vector<Op*> completed_;
std::vector<Op*> canceled_;
};

/**
* Wrapper around AsyncBase that allows you to schedule more requests than
* the AsyncBase's object capacity. Other requests are queued and processed
* in a FIFO order.
*/
class AsyncBaseQueue {
public:
/**
* Create a queue, using the given AsyncBase object.
* The AsyncBase object may not be used by anything else until the
* queue is destroyed.
*/
explicit AsyncBaseQueue(AsyncBase* asyncBase);
~AsyncBaseQueue();

size_t queued() const { return queue_.size(); }

/**
* Submit an op to the AsyncBase queue. The op will be queued until
* the AsyncBase object has room.
*/
void submit(AsyncBaseOp* op);

/**
* Submit a delayed op to the AsyncBase queue; this allows you to postpone
* creation of the Op (which may require allocating memory, etc) until
* the AsyncBase object has room.
*/
using OpFactory = std::function<AsyncBaseOp*()>;
void submit(OpFactory op);

private:
void onCompleted(AsyncBaseOp* op);
void maybeDequeue();

AsyncBase* asyncBase_;

std::deque<OpFactory> queue_;
};

} // namespace folly
#include <folly/io/async/AsyncBase.h>
Loading

0 comments on commit e5d54e8

Please sign in to comment.