diff --git a/contrib/ruby/ext/trilogy-ruby/cext.c b/contrib/ruby/ext/trilogy-ruby/cext.c index 54dc8168..70d731a8 100644 --- a/contrib/ruby/ext/trilogy-ruby/cext.c +++ b/contrib/ruby/ext/trilogy-ruby/cext.c @@ -15,6 +15,161 @@ #include "trilogy-ruby.h" +typedef struct _buffer_pool_entry_struct { + size_t cap; + uint8_t *buff; +} buffer_pool_entry; + +typedef struct _buffer_pool_struct { + size_t capa; + size_t len; + buffer_pool_entry *entries; +} buffer_pool; + +#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY +#include +static rb_atomic_t buffer_pool_max_size = 8; +#else +static unsigned int buffer_pool_max_size = 8; +static VALUE _global_buffer_pool = Qnil; +#endif + +static void buffer_pool_free(void *data) +{ +#ifndef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY + _global_buffer_pool = Qnil; +#endif + + buffer_pool *pool = (buffer_pool *)data; + if (pool->capa) { + for (size_t index = 0; index < pool->len; index++) { + xfree(pool->entries[index].buff); + } + xfree(pool->entries); + } + xfree(pool); +} + +static size_t buffer_pool_memsize(const void *data) +{ + const buffer_pool *pool = (const buffer_pool *)data; + + size_t memsize = sizeof(buffer_pool) + sizeof(buffer_pool_entry) * pool->capa; + + if (pool->capa) { + for (size_t index = 0; index < pool->len; index++) { + memsize += pool->entries[index].cap; + } + } + + return memsize; +} + +static const rb_data_type_t buffer_pool_type = { + .wrap_struct_name = "trilogy/buffer_pool", + .function = { + .dmark = NULL, + .dfree = buffer_pool_free, + .dsize = buffer_pool_memsize, + }, + .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED +}; + +static VALUE create_rb_buffer_pool(void) +{ + buffer_pool *pool; + return TypedData_Make_Struct(Qfalse, buffer_pool, &buffer_pool_type, pool); +} + +#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY +#include +static rb_ractor_local_key_t buffer_pool_key; + +static VALUE get_rb_buffer_pool(bool create) +{ + VALUE pool; + if (!rb_ractor_local_storage_value_lookup(buffer_pool_key, &pool) && create) { + pool = create_rb_buffer_pool(); + rb_ractor_local_storage_value_set(buffer_pool_key, pool); + } + return pool; +} +#else +static VALUE get_rb_buffer_pool(bool create) +{ + if (NIL_P(_global_buffer_pool) && create) { + _global_buffer_pool = create_rb_buffer_pool(); + } + return _global_buffer_pool; +} +#endif + +static inline buffer_pool *get_buffer_pool(bool create) +{ + buffer_pool *pool; + VALUE rb_pool = get_rb_buffer_pool(create); + if (NIL_P(rb_pool)) { + return NULL; + } + TypedData_Get_Struct(rb_pool, buffer_pool, &buffer_pool_type, pool); + return pool; +} + +static void buffer_checkout(trilogy_buffer_t *buffer, size_t initial_capacity) +{ + buffer_pool * pool = get_buffer_pool(true); + if (pool->len) { + pool->len--; + buffer->buff = pool->entries[pool->len].buff; + buffer->cap = pool->entries[pool->len].cap; + } else { + buffer->buff = RB_ALLOC_N(uint8_t, initial_capacity); + buffer->cap = initial_capacity; + } +} + +static bool buffer_checkin(trilogy_buffer_t *buffer) +{ + buffer_pool * pool = get_buffer_pool(true); + + if (pool->len >= buffer_pool_max_size) { + xfree(buffer->buff); + buffer->buff = NULL; + buffer->cap = 0; + return false; + } + + if (!pool->capa) { + pool->entries = RB_ALLOC_N(buffer_pool_entry, 16); + pool->capa = 16; + } else if (pool->len >= pool->capa) { + pool->capa *= 2; + RB_REALLOC_N(pool->entries, buffer_pool_entry, pool->capa); + } + + pool->entries[pool->len].buff = buffer->buff; + pool->entries[pool->len].cap = buffer->cap; + pool->len++; + + buffer->buff = NULL; + buffer->cap = 0; + + return true; +} + +static bool buffer_checkin_no_alloc(trilogy_buffer_t *buffer) +{ + if (get_buffer_pool(false)) { + return buffer_checkin(buffer); + } + + // The pool was freed, we're likely during Ruby shutdown + xfree(buffer->buff); + buffer->buff = NULL; + buffer->cap = 0; + return false; +} + VALUE Trilogy_CastError; static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError, Trilogy_ConnectionClosedError, @@ -34,6 +189,20 @@ struct trilogy_ctx { VALUE encoding; }; +static void rb_trilogy_acquire_buffer(struct trilogy_ctx *ctx) +{ + if (!ctx->conn.packet_buffer.buff) { + buffer_checkout(&ctx->conn.packet_buffer, TRILOGY_DEFAULT_BUF_SIZE); + } +} + +static void rb_trilogy_release_buffer(struct trilogy_ctx *ctx) +{ + if (ctx->conn.packet_buffer.buff) { + buffer_checkin(&ctx->conn.packet_buffer); + } +} + static void mark_trilogy(void *ptr) { struct trilogy_ctx *ctx = ptr; @@ -43,6 +212,11 @@ static void mark_trilogy(void *ptr) static void free_trilogy(void *ptr) { struct trilogy_ctx *ctx = ptr; + + if (ctx->conn.packet_buffer.buff) { + buffer_checkin_no_alloc(&ctx->conn.packet_buffer); + } + trilogy_free(&ctx->conn); xfree(ptr); } @@ -116,6 +290,8 @@ static void handle_trilogy_error(struct trilogy_ctx *ctx, int rc, const char *ms VALUE rbmsg = rb_vsprintf(msg, args); va_end(args); + rb_trilogy_release_buffer(ctx); + if (!trilogy_error_recoverable_p(rc)) { if (ctx->conn.socket != NULL) { // trilogy_sock_shutdown may affect errno @@ -178,7 +354,7 @@ static VALUE allocate_trilogy(VALUE klass) ctx->query_flags = TRILOGY_FLAGS_DEFAULT; - if (trilogy_init(&ctx->conn) < 0) { + if (trilogy_init_no_buffer(&ctx->conn) < 0) { VALUE rbmsg = rb_str_new("trilogy_init", 13); trilogy_syserr_fail_str(errno, rbmsg); } @@ -602,6 +778,8 @@ static VALUE rb_trilogy_connect(VALUE self, VALUE encoding, VALUE charset, VALUE connopt.tls_max_version = NUM2INT(val); } + rb_trilogy_acquire_buffer(ctx); + int rc = try_connect(ctx, &handshake, &connopt); if (rc != TRILOGY_OK) { if (connopt.path) { @@ -617,6 +795,8 @@ static VALUE rb_trilogy_connect(VALUE self, VALUE encoding, VALUE charset, VALUE authenticate(ctx, &handshake, connopt.ssl_mode); + rb_trilogy_release_buffer(ctx); + return Qnil; } @@ -626,6 +806,8 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database) StringValue(database); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_change_db_send(&ctx->conn, RSTRING_PTR(database), RSTRING_LEN(database)); if (rc == TRILOGY_AGAIN) { @@ -653,6 +835,8 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database) } } + rb_trilogy_release_buffer(ctx); + return Qtrue; } @@ -660,6 +844,8 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option) { struct trilogy_ctx *ctx = get_open_ctx(self); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_set_option_send(&ctx->conn, NUM2INT(option)); if (rc == TRILOGY_AGAIN) { @@ -687,6 +873,8 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option) } } + rb_trilogy_release_buffer(ctx); + return Qtrue; } @@ -892,6 +1080,10 @@ static VALUE execute_read_query_response(struct trilogy_ctx *ctx) handle_trilogy_error(ctx, args.rc, args.msg); } + if (!(ctx->conn.server_status & TRILOGY_SERVER_STATUS_MORE_RESULTS_EXISTS)) { + rb_trilogy_release_buffer(ctx); + } + return result; } @@ -924,6 +1116,8 @@ static VALUE rb_trilogy_query(VALUE self, VALUE query) StringValue(query); query = rb_str_export_to_enc(query, rb_to_encoding(ctx->encoding)); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_query_send(&ctx->conn, RSTRING_PTR(query), RSTRING_LEN(query)); if (rc == TRILOGY_AGAIN) { @@ -941,6 +1135,8 @@ static VALUE rb_trilogy_ping(VALUE self) { struct trilogy_ctx *ctx = get_open_ctx(self); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_ping_send(&ctx->conn); if (rc == TRILOGY_AGAIN) { @@ -968,6 +1164,7 @@ static VALUE rb_trilogy_ping(VALUE self) } } + rb_trilogy_release_buffer(ctx); return Qtrue; } @@ -985,13 +1182,19 @@ static VALUE rb_trilogy_escape(VALUE self, VALUE str) const char *escaped_str; size_t escaped_len; + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_escape(&ctx->conn, RSTRING_PTR(str), RSTRING_LEN(str), &escaped_str, &escaped_len); if (rc < 0) { handle_trilogy_error(ctx, rc, "trilogy_escape"); } - return rb_enc_str_new(escaped_str, escaped_len, str_enc); + VALUE escaped_string = rb_enc_str_new(escaped_str, escaped_len, str_enc); + + rb_trilogy_release_buffer(ctx); + + return escaped_string; } static VALUE rb_trilogy_close(VALUE self) @@ -1002,6 +1205,8 @@ static VALUE rb_trilogy_close(VALUE self) return Qnil; } + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_close_send(&ctx->conn); if (rc == TRILOGY_AGAIN) { @@ -1027,6 +1232,8 @@ static VALUE rb_trilogy_close(VALUE self) // we must clear any SSL errors left in the queue from a read/write. ERR_clear_error(); + rb_trilogy_release_buffer(ctx); + trilogy_free(&ctx->conn); return Qnil; @@ -1132,15 +1339,34 @@ static VALUE rb_trilogy_server_status(VALUE self) { return LONG2FIX(get_open_ctx static VALUE rb_trilogy_server_version(VALUE self) { return rb_str_new_cstr(get_open_ctx(self)->server_version); } +static VALUE rb_trilogy_buffer_pool_size(VALUE klass) +{ + return UINT2NUM(buffer_pool_max_size); +} + +static VALUE rb_trilogy_buffer_pool_size_set(VALUE klass, VALUE size) +{ +#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY + RUBY_ATOMIC_SET(buffer_pool_max_size, NUM2UINT(size)); +#else + buffer_pool_max_size = NUM2UINT(size); +#endif + return size; +} + RUBY_FUNC_EXPORTED void Init_cext(void) { - #ifdef HAVE_RB_EXT_RACTOR_SAFE + #ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY rb_ext_ractor_safe(true); + buffer_pool_key = rb_ractor_local_storage_value_newkey(); #endif VALUE Trilogy = rb_const_get(rb_cObject, rb_intern("Trilogy")); rb_define_alloc_func(Trilogy, allocate_trilogy); + rb_define_singleton_method(Trilogy, "buffer_pool_size", rb_trilogy_buffer_pool_size, 0); + rb_define_singleton_method(Trilogy, "buffer_pool_size=", rb_trilogy_buffer_pool_size_set, 1); + rb_define_private_method(Trilogy, "_connect", rb_trilogy_connect, 3); rb_define_method(Trilogy, "change_db", rb_trilogy_change_db, 1); rb_define_alias(Trilogy, "select_db", "change_db"); diff --git a/contrib/ruby/ext/trilogy-ruby/extconf.rb b/contrib/ruby/ext/trilogy-ruby/extconf.rb index f618ef21..f96063bc 100644 --- a/contrib/ruby/ext/trilogy-ruby/extconf.rb +++ b/contrib/ruby/ext/trilogy-ruby/extconf.rb @@ -17,5 +17,5 @@ have_library("crypto", "CRYPTO_malloc") have_library("ssl", "SSL_new") have_func("rb_interned_str", "ruby.h") - +have_func("rb_ractor_local_storage_value_newkey", "ruby.h") create_makefile "trilogy/cext" diff --git a/contrib/ruby/test/client_test.rb b/contrib/ruby/test/client_test.rb index 6b968e98..382cda60 100644 --- a/contrib/ruby/test/client_test.rb +++ b/contrib/ruby/test/client_test.rb @@ -1131,4 +1131,12 @@ def test_error_classes_exclusively_match_subclasses assert_operator SystemCallError, :===, klass.new assert_operator Trilogy::ConnectionError, :===, klass.new end + + def test_buffer_pool_size_can_be_configured + assert_equal 8, Trilogy.buffer_pool_size + Trilogy.buffer_pool_size = 4 + assert_equal 4, Trilogy.buffer_pool_size + ensure + Trilogy.buffer_pool_size = 8 + end end diff --git a/inc/trilogy/client.h b/inc/trilogy/client.h index afce6273..e1e0568d 100644 --- a/inc/trilogy/client.h +++ b/inc/trilogy/client.h @@ -106,6 +106,15 @@ typedef struct { */ int trilogy_init(trilogy_conn_t *conn); +/* trilogy_init_no_buffer - Same as trilogy_init but doesn't allocate the packet buffer + * + * conn - A pre-allocated trilogy_conn_t pointer. + * + * Return values: + * TRILOGY_OK - The trilogy_conn_t pointer was properly initialized + */ +int trilogy_init_no_buffer(trilogy_conn_t *conn); + /* trilogy_flush_writes - Attempt to flush the internal packet buffer to the * network. This must be used if a `_send` function returns TRILOGY_AGAIN, and * should continue to be called until it returns a value other than diff --git a/inc/trilogy/error.h b/inc/trilogy/error.h index dd0be72e..59b65552 100644 --- a/inc/trilogy/error.h +++ b/inc/trilogy/error.h @@ -25,7 +25,8 @@ XX(TRILOGY_MAX_PACKET_EXCEEDED, -20) \ XX(TRILOGY_UNKNOWN_TYPE, -21) \ XX(TRILOGY_TIMEOUT, -22) \ - XX(TRILOGY_AUTH_PLUGIN_ERROR, -23) + XX(TRILOGY_AUTH_PLUGIN_ERROR, -23) \ + XX(TRILOGY_MEM_ERROR, -24) enum { #define XX(name, code) name = code, diff --git a/src/buffer.c b/src/buffer.c index a4c78393..250130cd 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -23,6 +23,9 @@ int trilogy_buffer_expand(trilogy_buffer_t *buffer, size_t needed) { // expand buffer if necessary if (buffer->len + needed > buffer->cap) { + if (buffer->buff == NULL) + return TRILOGY_MEM_ERROR; + size_t new_cap = buffer->cap; while (buffer->len + needed > new_cap) { @@ -59,7 +62,9 @@ int trilogy_buffer_putc(trilogy_buffer_t *buffer, uint8_t c) void trilogy_buffer_free(trilogy_buffer_t *buffer) { - free(buffer->buff); - buffer->buff = NULL; - buffer->len = buffer->cap = 0; + if (buffer->buff) { + free(buffer->buff); + buffer->buff = NULL; + buffer->len = buffer->cap = 0; + } } diff --git a/src/client.c b/src/client.c index fbd286c5..b05ace0c 100644 --- a/src/client.c +++ b/src/client.c @@ -117,10 +117,8 @@ static int begin_write(trilogy_conn_t *conn) return trilogy_flush_writes(conn); } -int trilogy_init(trilogy_conn_t *conn) +int trilogy_init_no_buffer(trilogy_conn_t *conn) { - int rc; - conn->affected_rows = 0; conn->last_insert_id = 0; conn->warning_count = 0; @@ -142,8 +140,14 @@ int trilogy_init(trilogy_conn_t *conn) trilogy_packet_parser_init(&conn->packet_parser, &packet_parser_callbacks); conn->packet_parser.user_data = &conn->packet_buffer; - CHECKED(trilogy_buffer_init(&conn->packet_buffer, TRILOGY_DEFAULT_BUF_SIZE)); + return TRILOGY_OK; +} +int trilogy_init(trilogy_conn_t *conn) +{ + int rc; + trilogy_init_no_buffer(conn); + CHECKED(trilogy_buffer_init(&conn->packet_buffer, TRILOGY_DEFAULT_BUF_SIZE)); return TRILOGY_OK; }