Skip to content

Commit

Permalink
Use rb_io_wait function and cache io instance.
Browse files Browse the repository at this point in the history
# Conflicts:
#	contrib/ruby/ext/trilogy-ruby/cext.c
  • Loading branch information
ioquatix committed Oct 25, 2024
1 parent d22ed28 commit 8743738
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 9 deletions.
126 changes: 117 additions & 9 deletions contrib/ruby/ext/trilogy-ruby/cext.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,27 @@
#include <ruby/encoding.h>
#include <ruby/io.h>
#include <ruby/thread.h>

#include <sys/socket.h>
#include <sys/time.h>
#include <sys/un.h>

#include <unistd.h>
#include <fcntl.h>

#include <trilogy.h>

#include "trilogy-ruby.h"

#define TRILOGY_RB_TIMEOUT 1

#if defined(HAVE_RB_IO_WAIT) && defined(HAVE_RUBY_FIBER_SCHEDULER_H)
#define TRILOGY_RB_IO_WAIT
#endif

#ifdef TRILOGY_RB_IO_WAIT
#include <ruby/fiber/scheduler.h>
#endif

VALUE Trilogy_CastError;
static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError,
Trilogy_ConnectionClosedError,
Expand All @@ -28,26 +38,59 @@ static ID id_socket, id_host, id_port, id_username, id_password, id_found_rows,
id_from_code, id_from_errno, id_connection_options, id_max_allowed_packet;

struct trilogy_ctx {
VALUE self;

trilogy_conn_t conn;

#ifdef TRILOGY_RB_IO_WAIT
VALUE io;
#endif

char server_version[TRILOGY_SERVER_VERSION_SIZE + 1];
unsigned int query_flags;
VALUE encoding;
};

static void mark_trilogy(void *ptr)
static void trilogy_ctx_mark(void *ptr)
{
struct trilogy_ctx *ctx = ptr;
rb_gc_mark(ctx->encoding);

rb_gc_mark_movable(ctx->self);

#ifdef TRILOGY_RB_IO_WAIT
if (RTEST(ctx->io))
rb_gc_mark_movable(ctx->io);
#endif

rb_gc_mark_movable(ctx->encoding);
}

static void free_trilogy(void *ptr)
static void trilogy_ctx_compact(void *ptr)
{
struct trilogy_ctx *ctx = ptr;
trilogy_free(&ctx->conn);

rb_gc_location(ctx->self);

#ifdef TRILOGY_RB_IO_WAIT
if (RTEST(ctx->io))
ctx->io = rb_gc_location(ctx->io);
#endif

ctx->encoding = rb_gc_location(ctx->encoding);
}

static void trilogy_ctx_free(void *ptr)
{
struct trilogy_ctx *ctx = ptr;

if (ctx->conn.socket != NULL) {
trilogy_free(&ctx->conn);
}

xfree(ptr);
}

static size_t trilogy_memsize(const void *ptr) {
static size_t trilogy_ctx_memsize(const void *ptr) {
const struct trilogy_ctx *ctx = ptr;
size_t memsize = sizeof(struct trilogy_ctx);
if (ctx->conn.socket != NULL) {
Expand All @@ -60,9 +103,10 @@ static size_t trilogy_memsize(const void *ptr) {
static const rb_data_type_t trilogy_data_type = {
.wrap_struct_name = "trilogy",
.function = {
.dmark = mark_trilogy,
.dfree = free_trilogy,
.dsize = trilogy_memsize,
.dmark = trilogy_ctx_mark,
.dcompact = trilogy_ctx_compact,
.dfree = trilogy_ctx_free,
.dsize = trilogy_ctx_memsize,
},
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};
Expand Down Expand Up @@ -176,6 +220,12 @@ static VALUE allocate_trilogy(VALUE klass)

VALUE obj = TypedData_Make_Struct(klass, struct trilogy_ctx, &trilogy_data_type, ctx);

RB_OBJ_WRITE(obj, &ctx->self, obj);

#ifdef TRILOGY_RB_IO_WAIT
ctx->io = Qnil;
#endif

ctx->query_flags = TRILOGY_FLAGS_DEFAULT;

if (trilogy_init(&ctx->conn) < 0) {
Expand Down Expand Up @@ -224,6 +274,61 @@ struct rb_trilogy_wait_args {
int rc;
};

#ifdef TRILOGY_RB_IO_WAIT
static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)
{
struct trilogy_ctx *ctx = sock->user_data;
struct timeval *timeout = NULL;
int wait_flag = 0;

switch (wait) {
case TRILOGY_WAIT_READ:
timeout = &sock->opts.read_timeout;
wait_flag = RUBY_IO_READABLE;
break;

case TRILOGY_WAIT_WRITE:
timeout = &sock->opts.write_timeout;
wait_flag = RUBY_IO_WRITABLE;
break;

case TRILOGY_WAIT_CONNECT:
// wait for connection to be writable
timeout = &sock->opts.connect_timeout;
if (timeout->tv_sec == 0 && timeout->tv_usec == 0) {
// We used to use the write timeout for this, so if a connect timeout isn't configured, default to that.
timeout = &sock->opts.write_timeout;
}
wait_flag = RUBY_IO_WRITABLE;
break;

case TRILOGY_WAIT_HANDSHAKE:
// wait for handshake packet on initial connection
timeout = &sock->opts.connect_timeout;
wait_flag = RUBY_IO_READABLE;
break;

default:
return TRILOGY_ERR;
}

if (ctx->io == Qnil) {
RB_OBJ_WRITE(ctx->self, &ctx->io, rb_io_fdopen(trilogy_sock_fd(sock), O_RDWR, NULL));
}

VALUE result = rb_io_wait(ctx->io, RB_INT2NUM(wait_flag), rb_fiber_scheduler_make_timeout(timeout));

if (result == RUBY_Qfalse) {
return TRILOGY_TIMEOUT;
}

if (RTEST(result)) {
return TRILOGY_OK;
} else {
return TRILOGY_SYSERR;
}
}
#else
static VALUE rb_trilogy_wait_protected(VALUE vargs) {
struct rb_trilogy_wait_args *args = (void *)vargs;

Expand Down Expand Up @@ -294,6 +399,7 @@ static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)

return TRILOGY_OK;
}
#endif

struct nogvl_sock_args {
int rc;
Expand Down Expand Up @@ -330,6 +436,8 @@ static int try_connect(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake,
/* replace the default wait callback with our GVL-aware callback so we can
escape the GVL on each wait operation without going through call_without_gvl */
sock->wait_cb = _cb_ruby_wait;
sock->user_data = ctx;

rc = trilogy_connect_send_socket(&ctx->conn, sock);
if (rc < 0) {
trilogy_sock_close(sock);
Expand Down
3 changes: 3 additions & 0 deletions contrib/ruby/ext/trilogy-ruby/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@
have_library("ssl", "SSL_new")
have_func("rb_interned_str", "ruby.h")

have_func("rb_io_wait", "ruby.h")
have_header("ruby/fiber/scheduler.h")

create_makefile "trilogy/cext"
1 change: 1 addition & 0 deletions inc/trilogy/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ typedef struct trilogy_sock_t {
int (*fd_cb)(struct trilogy_sock_t *self);

trilogy_sockopt_t opts;
void *user_data;
} trilogy_sock_t;

static inline int trilogy_sock_connect(trilogy_sock_t *sock) { return sock->connect_cb(sock); }
Expand Down

0 comments on commit 8743738

Please sign in to comment.