Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Jan 17, 2024
1 parent abfce94 commit bbef794
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 73 deletions.
12 changes: 6 additions & 6 deletions io/fstack-dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ int fstack_socket(int domain, int type, int protocol) {
// linux_sockaddr is required by f-stack api, and has the same layout to sockaddr
static_assert(sizeof(linux_sockaddr) == sizeof(sockaddr));

int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, uint64_t timeout) {
int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, Timeout timeout) {
int err = 0;
while (true) {
int ret = ff_connect(sockfd, (linux_sockaddr*) addr, addrlen);
Expand Down Expand Up @@ -279,7 +279,7 @@ int fstack_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen) {
return ff_bind(sockfd, (linux_sockaddr*) addr, addrlen);
}

int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, uint64_t timeout) {
int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, Timeout timeout) {
return net::doio(LAMBDA(ff_accept(sockfd, (linux_sockaddr*) addr, addrlen)),
LAMBDA_TIMEOUT(g_engine->wait_for_fd_readable(sockfd, timeout)));
}
Expand All @@ -292,22 +292,22 @@ int fstack_shutdown(int sockfd, int how) {
return ff_shutdown(sockfd, how);
}

ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, uint64_t timeout) {
ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, Timeout timeout) {
return net::doio(LAMBDA(ff_send(sockfd, buf, count, flags)),
LAMBDA_TIMEOUT(g_engine->wait_for_fd_writable(sockfd, timeout)));
}

ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, uint64_t timeout) {
ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, Timeout timeout) {
return net::doio(LAMBDA(ff_sendmsg(sockfd, message, flags)),
LAMBDA_TIMEOUT(g_engine->wait_for_fd_writable(sockfd, timeout)));
}

ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, uint64_t timeout) {
ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, Timeout timeout) {
return net::doio(LAMBDA(ff_recv(sockfd, buf, count, flags)),
LAMBDA_TIMEOUT(g_engine->wait_for_fd_readable(sockfd, timeout)));
}

ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, uint64_t timeout) {
ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, Timeout timeout) {
return net::doio(LAMBDA(ff_recvmsg(sockfd, message, flags)),
LAMBDA_TIMEOUT(g_engine->wait_for_fd_readable(sockfd, timeout)));
}
Expand Down
12 changes: 6 additions & 6 deletions io/fstack-dpdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ int fstack_dpdk_fini();

int fstack_socket(int domain, int type, int protocol);

int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, uint64_t timeout);
int fstack_connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen, Timeout timeout = {});

int fstack_listen(int sockfd, int backlog);

int fstack_bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen);

int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, uint64_t timeout);
int fstack_accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen, Timeout timeout = {});

int fstack_close(int fd);

int fstack_shutdown(int sockfd, int how);

ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, uint64_t timeout);
ssize_t fstack_send(int sockfd, const void* buf, size_t count, int flags, Timeout timeout = {});

ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, uint64_t timeout);
ssize_t fstack_sendmsg(int sockfd, const struct msghdr* message, int flags, Timeout timeout = {});

ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, uint64_t timeout);
ssize_t fstack_recv(int sockfd, void* buf, size_t count, int flags, Timeout timeout = {});

ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, uint64_t timeout);
ssize_t fstack_recvmsg(int sockfd, struct msghdr* message, int flags, Timeout timeout = {});

int fstack_setsockopt(int socket, int level, int option_name, const void* option_value, socklen_t option_len);

Expand Down
48 changes: 24 additions & 24 deletions net/basic_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ int socket(int domain, int type, int protocol) {
#endif
}
int connect(int fd, const struct sockaddr *addr, socklen_t addrlen,
uint64_t timeout) {
Timeout timeout) {
int err = 0;
while (true) {
int ret = ::connect(fd, addr, addrlen);
Expand Down Expand Up @@ -110,7 +110,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen,
}

int accept(int fd, struct sockaddr *addr, socklen_t *addrlen,
uint64_t timeout) {
Timeout timeout) {
#ifdef __APPLE__
auto ret = (int)doio(LAMBDA(::accept(fd, addr, addrlen)),
LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout)));
Expand All @@ -125,11 +125,11 @@ int accept(int fd, struct sockaddr *addr, socklen_t *addrlen,
LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout)));
#endif
}
ssize_t read(int fd, void *buf, size_t count, uint64_t timeout) {
ssize_t read(int fd, void *buf, size_t count, Timeout timeout) {
return doio(LAMBDA(::read(fd, buf, count)),
LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout)));
}
ssize_t readv(int fd, const struct iovec *iov, int iovcnt, uint64_t timeout) {
ssize_t readv(int fd, const struct iovec *iov, int iovcnt, Timeout timeout) {
if (iovcnt <= 0) {
errno = EINVAL;
return -1;
Expand All @@ -139,7 +139,7 @@ ssize_t readv(int fd, const struct iovec *iov, int iovcnt, uint64_t timeout) {
LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout)));
}
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count,
uint64_t timeout) {
Timeout timeout) {
#ifdef __APPLE__
off_t len = count;
ssize_t ret =
Expand All @@ -152,7 +152,7 @@ ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count,
#endif
}

ssize_t sendmsg_zerocopy(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, uint64_t timeout) {
ssize_t sendmsg_zerocopy(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, Timeout timeout) {
msghdr msg = {};
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
Expand All @@ -162,77 +162,77 @@ ssize_t sendmsg_zerocopy(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, ui
return ret;
}

ssize_t read_n(int fd, void *buf, size_t count, uint64_t timeout) {
ssize_t read_n(int fd, void *buf, size_t count, Timeout timeout) {
return doio_n(buf, count, LAMBDA_TIMEOUT(read(fd, buf, count, timeout)));
}
ssize_t sendfile_n(int out_fd, int in_fd, off_t *offset, size_t count,
uint64_t timeout) {
Timeout timeout) {
void* buf_unused = nullptr;
return doio_n(buf_unused, count,
LAMBDA_TIMEOUT(sendfile(out_fd, in_fd, offset, count, timeout)));
}

ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout) {
ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout) {
iovector_view v(iov, iovcnt);
return doiov_n(v, LAMBDA_TIMEOUT(readv(fd, v.iov, v.iovcnt, timeout)));
}

ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, uint64_t timeout) {
ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, Timeout timeout) {
iovector_view v(iov, iovcnt);
return doiov_n(v, LAMBDA_TIMEOUT(sendmsg_zerocopy(fd, v.iov, v.iovcnt, num_calls, timeout)));
}

ssize_t send(int fd, const void *buf, size_t count, int flags, uint64_t timeout) {
ssize_t send(int fd, const void *buf, size_t count, int flags, Timeout timeout) {
return doio(LAMBDA(::send(fd, buf, count, flags)),
LAMBDA_TIMEOUT(photon::wait_for_fd_writable(fd, timeout)));
}

ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout) {
ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, Timeout timeout) {
return doio(LAMBDA(::sendmsg(fd, msg, flags)),
LAMBDA_TIMEOUT(photon::wait_for_fd_writable(fd, timeout)));
}

ssize_t recv(int fd, void* buf, size_t count, int flags, uint64_t timeout) {
ssize_t recv(int fd, void* buf, size_t count, int flags, Timeout timeout) {
return doio(LAMBDA(::recv(fd, buf, count, flags)),
LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout)));
}

ssize_t recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout) {
ssize_t recvmsg(int fd, struct msghdr* msg, int flags, Timeout timeout) {
return doio(LAMBDA(::recvmsg(fd, msg, flags)),
LAMBDA_TIMEOUT(photon::wait_for_fd_readable(fd, timeout)));
}

ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, uint64_t timeout) {
ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, Timeout timeout) {
msghdr msg = {};
msg.msg_iov = (struct iovec*)iov;
msg.msg_iovlen = iovcnt;
return doio(LAMBDA(::sendmsg(fd, &msg, flag | MSG_NOSIGNAL)),
LAMBDA_TIMEOUT(photon::wait_for_fd_writable(fd, timeout)));
}
ssize_t send_n(int fd, const void *buf, size_t count, int flag, uint64_t timeout) {
ssize_t send_n(int fd, const void *buf, size_t count, int flag, Timeout timeout) {
return doio_n((void *&)buf, count,
LAMBDA_TIMEOUT(send(fd, (const void*)buf, (size_t)count, flag, timeout)));
}

ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, uint64_t timeout) {
ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, Timeout timeout) {
iovector_view v(iov, iovcnt);
return doiov_n(v, LAMBDA_TIMEOUT(sendv(fd, (struct iovec*)v.iov, (int)v.iovcnt, flag, timeout)));
}
ssize_t write(int fd, const void *buf, size_t count, uint64_t timeout) {
ssize_t write(int fd, const void *buf, size_t count, Timeout timeout) {
return send(fd, buf, count, MSG_NOSIGNAL, timeout);
}
ssize_t writev(int fd, const struct iovec *iov, int iovcnt, uint64_t timeout) {
ssize_t writev(int fd, const struct iovec *iov, int iovcnt, Timeout timeout) {
return sendv(fd, iov, iovcnt, 0, timeout);
}
ssize_t write_n(int fd, const void *buf, size_t count, uint64_t timeout) {
ssize_t write_n(int fd, const void *buf, size_t count, Timeout timeout) {
return send_n(fd, buf, count, 0, timeout);
}
ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout) {
ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout) {
return sendv_n(fd, iov, iovcnt, 0, timeout);
}

ssize_t sendfile_fallback(ISocketStream* out_stream,
int in_fd, off_t offset, size_t count, uint64_t timeout) {
int in_fd, off_t offset, size_t count, Timeout timeout) {
char buf[64 * 1024];
void* ptr_unused = nullptr;
auto func = [&]() -> ssize_t {
Expand Down Expand Up @@ -352,7 +352,7 @@ static ssize_t recv_errqueue(int fd, uint32_t &ret_counter) {
return 0;
}

static int64_t read_counter(int fd, uint64_t timeout) {
static int64_t read_counter(int fd, Timeout timeout) {
uint32_t counter = 0;
auto ret = doio(LAMBDA(recv_errqueue(fd, counter)),
LAMBDA_TIMEOUT(photon::wait_for_fd_error(fd, timeout)));
Expand All @@ -365,7 +365,7 @@ inline bool is_counter_less_than(uint32_t left, uint32_t right) {
return (left < right) || (left > right && left > mid && right - left < mid);
}

ssize_t zerocopy_confirm(int fd, uint32_t num_calls, uint64_t timeout) {
ssize_t zerocopy_confirm(int fd, uint32_t num_calls, Timeout timeout) {
auto func = LAMBDA_TIMEOUT(read_counter(fd, timeout));
uint32_t counter = 0;
do {
Expand Down
58 changes: 30 additions & 28 deletions net/basic_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,64 +28,66 @@ namespace net {
int socket(int domain, int type, int protocol);

int connect(int fd, const struct sockaddr *addr, socklen_t addrlen,
uint64_t timeout = -1);
Timeout timeout = {});

int accept(int fd, struct sockaddr *addr, socklen_t *addrlen,
uint64_t timeout = -1);
Timeout timeout = {});

ssize_t send(int fd, const void* buf, size_t len, int flags, uint64_t timeout = -1);
ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout = -1);
ssize_t recv(int fd, void* buf, size_t count, int flags, uint64_t timeout = -1);
ssize_t recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout = -1);
ssize_t send(int fd, const void* buf, size_t len, int flags, Timeout timeout = {});
ssize_t sendmsg(int fd, const struct msghdr* msg, int flags, Timeout timeout = {});
ssize_t recv(int fd, void* buf, size_t count, int flags, Timeout timeout = {});
ssize_t recvmsg(int fd, struct msghdr* msg, int flags, Timeout timeout = {});

ssize_t read(int fd, void *buf, size_t count, uint64_t timeout = -1);
ssize_t read(int fd, void *buf, size_t count, Timeout timeout = {});

ssize_t readv(int fd, const struct iovec *iov, int iovcnt,
uint64_t timeout = -1);
Timeout timeout = {});

ssize_t write(int fd, const void *buf, size_t count, uint64_t timeout = -1);
ssize_t write(int fd, const void *buf, size_t count, Timeout timeout = {});

ssize_t writev(int fd, const struct iovec *iov, int iovcnt,
uint64_t timeout = -1);
Timeout timeout = {});

ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count,
uint64_t timeout = -1);
Timeout timeout = {});

ssize_t read_n(int fd, void *buf, size_t count, uint64_t timeout = -1);
ssize_t read_n(int fd, void *buf, size_t count, Timeout timeout = {});

ssize_t write_n(int fd, const void *buf, size_t count, uint64_t timeout = -1);
ssize_t write_n(int fd, const void *buf, size_t count, Timeout timeout = {});

ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout = -1);
ssize_t readv_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout = {});

ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, uint64_t timeout = -1);
ssize_t writev_n(int fd, struct iovec *iov, int iovcnt, Timeout timeout = {});

ssize_t sendfile_n(int out_fd, int in_fd, off_t *offset, size_t count,
uint64_t timeout = -1);
Timeout timeout = {});

class ISocketStream;
ssize_t sendfile_fallback(ISocketStream* out_stream, int in_fd, off_t offset, size_t count, uint64_t timeout = -1);
ssize_t sendfile_fallback(ISocketStream* out_stream, int in_fd, off_t offset, size_t count, Timeout timeout = {});

ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, uint64_t timeout = -1);
ssize_t zerocopy_n(int fd, iovec* iov, int iovcnt, uint32_t& num_calls, Timeout timeout = {});

ssize_t zerocopy_confirm(int fd, uint32_t num_calls, uint64_t timeout = -1);
ssize_t zerocopy_confirm(int fd, uint32_t num_calls, Timeout timeout = {});

ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, uint64_t timeout =-1);
ssize_t sendv(int fd, const struct iovec *iov, int iovcnt, int flag, Timeout timeout = {});

ssize_t send_n(int fd, const void *buf, size_t count, int flag, uint64_t timeout =-1);
ssize_t send_n(int fd, const void *buf, size_t count, int flag, Timeout timeout = {});

ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, uint64_t timeout =-1);
ssize_t sendv_n(int fd, struct iovec *iov, int iovcnt, int flag, Timeout timeout = {});

int set_socket_nonblocking(int fd);

int set_fd_nonblocking(int fd);

#define LAMBDA(expr) [&]() __INLINE__ { return expr; }
#define LAMBDA_TIMEOUT(expr) \
[&]() __INLINE__ { \
Timeout __tmo(timeout); \
DEFER(timeout = __tmo.timeout()); \
return expr; \
}
#define LAMBDA_TIMEOUT(expr) LAMBDA(expr)

// #define LAMBDA_TIMEOUT(expr) \
// [&]() __INLINE__ { \
// Timeout __tmo(timeout); \
// DEFER(timeout = __tmo.timeout()); \
// return expr; \
// }

template <typename IOCB, typename WAIT>
__FORCE_INLINE__ int doio(IOCB iocb, WAIT waitcb) {
Expand Down
18 changes: 9 additions & 9 deletions net/kernel_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,26 @@ class KernelSocketStream : public SocketStreamBase {
close();
}
ssize_t read(void* buf, size_t count) override {
uint64_t timeout = m_timeout;
Timeout timeout(m_timeout);
auto cb = LAMBDA_TIMEOUT(do_recv(fd, buf, count, 0, timeout));
return net::doio_n(buf, count, cb);
}
ssize_t readv(const iovec* iov, int iovcnt) override {
SmartCloneIOV<8> clone(iov, iovcnt);
iovector_view view(clone.ptr, iovcnt);
uint64_t timeout = m_timeout;
Timeout timeout(m_timeout);
auto cb = LAMBDA_TIMEOUT(do_recvmsg(fd, tmp_msg_hdr(view), 0, timeout));
return net::doiov_n(view, cb);
}
ssize_t write(const void* buf, size_t count) override {
uint64_t timeout = m_timeout;
Timeout timeout(m_timeout);
auto cb = LAMBDA_TIMEOUT(do_send(fd, buf, count, MSG_NOSIGNAL, timeout));
return net::doio_n((void*&) buf, count, cb);
}
ssize_t writev(const iovec* iov, int iovcnt) override {
SmartCloneIOV<8> clone(iov, iovcnt);
iovector_view view(clone.ptr, iovcnt);
uint64_t timeout = m_timeout;
Timeout timeout(m_timeout);
auto cb = LAMBDA_TIMEOUT(do_sendmsg(fd, tmp_msg_hdr(view), MSG_NOSIGNAL, timeout));
return net::doiov_n(view, cb);
}
Expand Down Expand Up @@ -389,11 +389,10 @@ class KernelSocketServer : public SocketServerBase {
int m_socket_family;
bool m_autoremove;
bool m_nonblocking;

Handler m_handler;
photon::thread* workth = nullptr;
bool m_block = false;
bool waiting = false;
Handler m_handler;
photon::thread* workth = nullptr;
int m_listen_fd = -1;

virtual KernelSocketStream* create_stream(int fd) {
Expand Down Expand Up @@ -468,9 +467,10 @@ class ZeroCopySocketStream : public KernelSocketStream {
}

ssize_t do_sendmsg(int sockfd, const struct msghdr* message, int flags, uint64_t timeout) override {
ssize_t n = photon::net::sendmsg(sockfd, message, flags | ZEROCOPY_FLAG, timeout);
Timeout tmo(timeout);
ssize_t n = photon::net::sendmsg(sockfd, message, flags | ZEROCOPY_FLAG, tmo);
m_num_calls++;
auto ret = zerocopy_confirm(sockfd, m_num_calls - 1, timeout);
auto ret = zerocopy_confirm(sockfd, m_num_calls - 1, tmo);
if (ret < 0)
return ret;
return n;
Expand Down

0 comments on commit bbef794

Please sign in to comment.