Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rb_io_wait function and cache io instance. #47

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 142 additions & 10 deletions contrib/ruby/ext/trilogy-ruby/cext.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
#include <ruby/encoding.h>
#include <ruby/io.h>
#include <ruby/thread.h>

#include <sys/socket.h>
#include <sys/time.h>
#include <sys/un.h>

#include <unistd.h>
#include <fcntl.h>

#include <trilogy.h>

#include "trilogy-ruby.h"

#if defined(HAVE_RB_IO_WAIT) && defined(RB_IO_OPEN_DESCRIPTOR) && defined(HAVE_RUBY_FIBER_SCHEDULER_H)
#define TRILOGY_RB_IO_WAIT
#endif

#ifdef TRILOGY_RB_IO_WAIT
#include <ruby/fiber/scheduler.h>
#endif

VALUE Trilogy_CastError;
static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError,
Trilogy_ConnectionClosedError,
Expand All @@ -28,26 +36,58 @@ static ID id_socket, id_host, id_port, id_username, id_password, id_found_rows,
id_from_code, id_from_errno, id_connection_options, id_max_allowed_packet;

struct trilogy_ctx {
VALUE self;

trilogy_conn_t conn;

#ifdef TRILOGY_RB_IO_WAIT
VALUE io;
#endif

char server_version[TRILOGY_SERVER_VERSION_SIZE + 1];
unsigned int query_flags;
VALUE encoding;
};

static void mark_trilogy(void *ptr)
static void trilogy_ctx_mark(void *ptr)
{
struct trilogy_ctx *ctx = ptr;
rb_gc_mark(ctx->encoding);

rb_gc_mark_movable(ctx->self);

#ifdef TRILOGY_RB_IO_WAIT
rb_gc_mark_movable(ctx->io);
#endif

rb_gc_mark_movable(ctx->encoding);
}

static void free_trilogy(void *ptr)
static void trilogy_ctx_compact(void *ptr)
{
struct trilogy_ctx *ctx = ptr;
trilogy_free(&ctx->conn);

ctx->self = rb_gc_location(ctx->self);

#ifdef TRILOGY_RB_IO_WAIT
if (RTEST(ctx->io))
ctx->io = rb_gc_location(ctx->io);
#endif

ctx->encoding = rb_gc_location(ctx->encoding);
}

static void trilogy_ctx_free(void *ptr)
{
struct trilogy_ctx *ctx = ptr;

if (ctx->conn.socket != NULL) {
trilogy_free(&ctx->conn);
}

xfree(ptr);
}

static size_t trilogy_memsize(const void *ptr) {
static size_t trilogy_ctx_memsize(const void *ptr) {
const struct trilogy_ctx *ctx = ptr;
size_t memsize = sizeof(struct trilogy_ctx);
if (ctx->conn.socket != NULL) {
Expand All @@ -60,9 +100,10 @@ static size_t trilogy_memsize(const void *ptr) {
static const rb_data_type_t trilogy_data_type = {
.wrap_struct_name = "trilogy",
.function = {
.dmark = mark_trilogy,
.dfree = free_trilogy,
.dsize = trilogy_memsize,
.dmark = trilogy_ctx_mark,
.dcompact = trilogy_ctx_compact,
.dfree = trilogy_ctx_free,
.dsize = trilogy_ctx_memsize,
},
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};
Expand Down Expand Up @@ -176,6 +217,12 @@ static VALUE allocate_trilogy(VALUE klass)

VALUE obj = TypedData_Make_Struct(klass, struct trilogy_ctx, &trilogy_data_type, ctx);

RB_OBJ_WRITE(obj, &ctx->self, obj);

#ifdef TRILOGY_RB_IO_WAIT
ctx->io = Qnil;
#endif

ctx->query_flags = TRILOGY_FLAGS_DEFAULT;

if (trilogy_init(&ctx->conn) < 0) {
Expand Down Expand Up @@ -224,6 +271,66 @@ struct rb_trilogy_wait_args {
int rc;
};

#ifdef TRILOGY_RB_IO_WAIT
static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)
{
struct trilogy_ctx *ctx = sock->user_data;
ioquatix marked this conversation as resolved.
Show resolved Hide resolved
struct timeval *timeout = NULL;
int wait_flag = 0;

switch (wait) {
case TRILOGY_WAIT_READ:
timeout = &sock->opts.read_timeout;
wait_flag = RUBY_IO_READABLE;
break;

case TRILOGY_WAIT_WRITE:
timeout = &sock->opts.write_timeout;
wait_flag = RUBY_IO_WRITABLE;
break;

case TRILOGY_WAIT_CONNECT:
// wait for connection to be writable
timeout = &sock->opts.connect_timeout;
if (timeout->tv_sec == 0 && timeout->tv_usec == 0) {
// We used to use the write timeout for this, so if a connect timeout isn't configured, default to that.
timeout = &sock->opts.write_timeout;
}
wait_flag = RUBY_IO_WRITABLE;
break;

case TRILOGY_WAIT_HANDSHAKE:
// wait for handshake packet on initial connection
timeout = &sock->opts.connect_timeout;
wait_flag = RUBY_IO_READABLE;
break;

default:
return TRILOGY_ERR;
}

if (ctx->io == Qnil) {
VALUE io = rb_io_open_descriptor(rb_cIO, trilogy_sock_fd(sock), FMODE_EXTERNAL, RUBY_Qnil, RUBY_Qnil, NULL);
RB_OBJ_WRITE(ctx->self, &ctx->io, io);
}

if (timeout->tv_sec == 0 && timeout->tv_usec == 0) {
timeout = NULL;
}

VALUE result = rb_io_wait(ctx->io, RB_INT2NUM(wait_flag), rb_fiber_scheduler_make_timeout(timeout));

if (result == RUBY_Qfalse) {
return TRILOGY_TIMEOUT;
}

if (RTEST(result)) {
return TRILOGY_OK;
} else {
return TRILOGY_SYSERR;
}
}
#else
static VALUE rb_trilogy_wait_protected(VALUE vargs) {
struct rb_trilogy_wait_args *args = (void *)vargs;

Expand Down Expand Up @@ -294,6 +401,7 @@ static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)

return TRILOGY_OK;
}
#endif

struct nogvl_sock_args {
int rc;
Expand All @@ -307,7 +415,7 @@ static void *no_gvl_resolve(void *data)
return NULL;
}

static int try_connect(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake, const trilogy_sockopt_t *opts)
static int try_connect(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake, trilogy_sockopt_t *opts)
{
trilogy_sock_t *sock = trilogy_sock_new(opts);
if (sock == NULL) {
Expand All @@ -316,6 +424,28 @@ static int try_connect(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake,

struct nogvl_sock_args args = {.rc = 0, .sock = sock};

// Attempt to resolve a non-numeric hostname using the fiber scheduler if possible.
#ifdef TRILOGY_RB_IO_WAIT
if (opts->hostname != NULL) {
VALUE scheduler = rb_fiber_scheduler_current();

if (scheduler != Qnil) {
VALUE addresses = rb_fiber_scheduler_address_resolve(scheduler, rb_str_new_cstr(opts->hostname));

if (RARRAY_LEN(addresses) == 0) {
return TRILOGY_DNS_ERR;
}

free(opts->hostname);
opts->hostname = NULL;
VALUE address = rb_ary_entry(addresses, 0);
StringValue(address);

opts->hostname = strndup(RSTRING_PTR(address), RSTRING_LEN(address));
}
}
#endif

// Do the DNS resolving with the GVL unlocked. At this point all
// configuration data is copied and available to the trilogy socket.
rb_thread_call_without_gvl(no_gvl_resolve, (void *)&args, RUBY_UBF_IO, NULL);
Expand All @@ -330,6 +460,8 @@ static int try_connect(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake,
/* replace the default wait callback with our GVL-aware callback so we can
escape the GVL on each wait operation without going through call_without_gvl */
sock->wait_cb = _cb_ruby_wait;
sock->user_data = ctx;

rc = trilogy_connect_send_socket(&ctx->conn, sock);
if (rc < 0) {
trilogy_sock_close(sock);
Expand Down
4 changes: 4 additions & 0 deletions contrib/ruby/ext/trilogy-ruby/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@
have_library("ssl", "SSL_new")
have_func("rb_interned_str", "ruby.h")

have_func("rb_io_wait", "ruby.h")
have_func("rb_io_open_descriptor", "ruby.h")
have_header("ruby/fiber/scheduler.h")

create_makefile "trilogy/cext"
1 change: 1 addition & 0 deletions inc/trilogy/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ typedef struct trilogy_sock_t {
int (*fd_cb)(struct trilogy_sock_t *self);

trilogy_sockopt_t opts;
void *user_data;
} trilogy_sock_t;

static inline int trilogy_sock_connect(trilogy_sock_t *sock) { return sock->connect_cb(sock); }
Expand Down