From 0afd4f9a4c7f2409f158d0a52d005ca6829a9dec Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 2 Dec 2024 17:33:46 +0100 Subject: [PATCH] ruby: Implement a buffer pool The trilogy client eagerly allocate a 32kiB buffer, and grows it as needed. It's never freed not shrunk until the connection is closed. Since by default the MySQL `max_allowed_packet` is 16MiB, long living connections will progressively grow to that size. For basic usage it's not a big deal, but some applications may have dozens if not hundreds of connections that are mostly idle. A common case being multi-tenant applications with horizontal sharding. In such cases you only ever query one database but have open connections to many databases. This situation might lead to a lot of memory retained by trilogy connections and never really released, looking very much like a memory leak. This can be reproduced with a simple script: ```ruby require 'trilogy' connection_pool = [] 50.times do t = Trilogy.new(database: "test") t.query("select '#{"a" * 16_000_000}' as a") connection_pool << t end puts "#{`ps -o rss= -p #{$$}`} kiB" ``` ``` $ ruby /tmp/trilogy-leak.rb 927120 kiB ``` If we instead take over the buffer lifetime management, we can implement some pooling for the buffers, we can limit the total number of buffer to as many connections are actually in use concurrently. The same reproduction script with the current branch: ``` $ ruby -Ilib:ext /tmp/trilogy-leak.rb 108144 kiB ``` --- contrib/ruby/ext/trilogy-ruby/cext.c | 232 ++++++++++++++++++++++- contrib/ruby/ext/trilogy-ruby/extconf.rb | 2 +- contrib/ruby/test/client_test.rb | 8 + inc/trilogy/client.h | 9 + inc/trilogy/error.h | 3 +- src/buffer.c | 11 +- src/client.c | 12 +- 7 files changed, 265 insertions(+), 12 deletions(-) diff --git a/contrib/ruby/ext/trilogy-ruby/cext.c b/contrib/ruby/ext/trilogy-ruby/cext.c index 54dc8168..70d731a8 100644 --- a/contrib/ruby/ext/trilogy-ruby/cext.c +++ b/contrib/ruby/ext/trilogy-ruby/cext.c @@ -15,6 +15,161 @@ #include "trilogy-ruby.h" +typedef struct _buffer_pool_entry_struct { + size_t cap; + uint8_t *buff; +} buffer_pool_entry; + +typedef struct _buffer_pool_struct { + size_t capa; + size_t len; + buffer_pool_entry *entries; +} buffer_pool; + +#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY +#include +static rb_atomic_t buffer_pool_max_size = 8; +#else +static unsigned int buffer_pool_max_size = 8; +static VALUE _global_buffer_pool = Qnil; +#endif + +static void buffer_pool_free(void *data) +{ +#ifndef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY + _global_buffer_pool = Qnil; +#endif + + buffer_pool *pool = (buffer_pool *)data; + if (pool->capa) { + for (size_t index = 0; index < pool->len; index++) { + xfree(pool->entries[index].buff); + } + xfree(pool->entries); + } + xfree(pool); +} + +static size_t buffer_pool_memsize(const void *data) +{ + const buffer_pool *pool = (const buffer_pool *)data; + + size_t memsize = sizeof(buffer_pool) + sizeof(buffer_pool_entry) * pool->capa; + + if (pool->capa) { + for (size_t index = 0; index < pool->len; index++) { + memsize += pool->entries[index].cap; + } + } + + return memsize; +} + +static const rb_data_type_t buffer_pool_type = { + .wrap_struct_name = "trilogy/buffer_pool", + .function = { + .dmark = NULL, + .dfree = buffer_pool_free, + .dsize = buffer_pool_memsize, + }, + .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED +}; + +static VALUE create_rb_buffer_pool(void) +{ + buffer_pool *pool; + return TypedData_Make_Struct(Qfalse, buffer_pool, &buffer_pool_type, pool); +} + +#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY +#include +static rb_ractor_local_key_t buffer_pool_key; + +static VALUE get_rb_buffer_pool(bool create) +{ + VALUE pool; + if (!rb_ractor_local_storage_value_lookup(buffer_pool_key, &pool) && create) { + pool = create_rb_buffer_pool(); + rb_ractor_local_storage_value_set(buffer_pool_key, pool); + } + return pool; +} +#else +static VALUE get_rb_buffer_pool(bool create) +{ + if (NIL_P(_global_buffer_pool) && create) { + _global_buffer_pool = create_rb_buffer_pool(); + } + return _global_buffer_pool; +} +#endif + +static inline buffer_pool *get_buffer_pool(bool create) +{ + buffer_pool *pool; + VALUE rb_pool = get_rb_buffer_pool(create); + if (NIL_P(rb_pool)) { + return NULL; + } + TypedData_Get_Struct(rb_pool, buffer_pool, &buffer_pool_type, pool); + return pool; +} + +static void buffer_checkout(trilogy_buffer_t *buffer, size_t initial_capacity) +{ + buffer_pool * pool = get_buffer_pool(true); + if (pool->len) { + pool->len--; + buffer->buff = pool->entries[pool->len].buff; + buffer->cap = pool->entries[pool->len].cap; + } else { + buffer->buff = RB_ALLOC_N(uint8_t, initial_capacity); + buffer->cap = initial_capacity; + } +} + +static bool buffer_checkin(trilogy_buffer_t *buffer) +{ + buffer_pool * pool = get_buffer_pool(true); + + if (pool->len >= buffer_pool_max_size) { + xfree(buffer->buff); + buffer->buff = NULL; + buffer->cap = 0; + return false; + } + + if (!pool->capa) { + pool->entries = RB_ALLOC_N(buffer_pool_entry, 16); + pool->capa = 16; + } else if (pool->len >= pool->capa) { + pool->capa *= 2; + RB_REALLOC_N(pool->entries, buffer_pool_entry, pool->capa); + } + + pool->entries[pool->len].buff = buffer->buff; + pool->entries[pool->len].cap = buffer->cap; + pool->len++; + + buffer->buff = NULL; + buffer->cap = 0; + + return true; +} + +static bool buffer_checkin_no_alloc(trilogy_buffer_t *buffer) +{ + if (get_buffer_pool(false)) { + return buffer_checkin(buffer); + } + + // The pool was freed, we're likely during Ruby shutdown + xfree(buffer->buff); + buffer->buff = NULL; + buffer->cap = 0; + return false; +} + VALUE Trilogy_CastError; static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError, Trilogy_ConnectionClosedError, @@ -34,6 +189,20 @@ struct trilogy_ctx { VALUE encoding; }; +static void rb_trilogy_acquire_buffer(struct trilogy_ctx *ctx) +{ + if (!ctx->conn.packet_buffer.buff) { + buffer_checkout(&ctx->conn.packet_buffer, TRILOGY_DEFAULT_BUF_SIZE); + } +} + +static void rb_trilogy_release_buffer(struct trilogy_ctx *ctx) +{ + if (ctx->conn.packet_buffer.buff) { + buffer_checkin(&ctx->conn.packet_buffer); + } +} + static void mark_trilogy(void *ptr) { struct trilogy_ctx *ctx = ptr; @@ -43,6 +212,11 @@ static void mark_trilogy(void *ptr) static void free_trilogy(void *ptr) { struct trilogy_ctx *ctx = ptr; + + if (ctx->conn.packet_buffer.buff) { + buffer_checkin_no_alloc(&ctx->conn.packet_buffer); + } + trilogy_free(&ctx->conn); xfree(ptr); } @@ -116,6 +290,8 @@ static void handle_trilogy_error(struct trilogy_ctx *ctx, int rc, const char *ms VALUE rbmsg = rb_vsprintf(msg, args); va_end(args); + rb_trilogy_release_buffer(ctx); + if (!trilogy_error_recoverable_p(rc)) { if (ctx->conn.socket != NULL) { // trilogy_sock_shutdown may affect errno @@ -178,7 +354,7 @@ static VALUE allocate_trilogy(VALUE klass) ctx->query_flags = TRILOGY_FLAGS_DEFAULT; - if (trilogy_init(&ctx->conn) < 0) { + if (trilogy_init_no_buffer(&ctx->conn) < 0) { VALUE rbmsg = rb_str_new("trilogy_init", 13); trilogy_syserr_fail_str(errno, rbmsg); } @@ -602,6 +778,8 @@ static VALUE rb_trilogy_connect(VALUE self, VALUE encoding, VALUE charset, VALUE connopt.tls_max_version = NUM2INT(val); } + rb_trilogy_acquire_buffer(ctx); + int rc = try_connect(ctx, &handshake, &connopt); if (rc != TRILOGY_OK) { if (connopt.path) { @@ -617,6 +795,8 @@ static VALUE rb_trilogy_connect(VALUE self, VALUE encoding, VALUE charset, VALUE authenticate(ctx, &handshake, connopt.ssl_mode); + rb_trilogy_release_buffer(ctx); + return Qnil; } @@ -626,6 +806,8 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database) StringValue(database); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_change_db_send(&ctx->conn, RSTRING_PTR(database), RSTRING_LEN(database)); if (rc == TRILOGY_AGAIN) { @@ -653,6 +835,8 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database) } } + rb_trilogy_release_buffer(ctx); + return Qtrue; } @@ -660,6 +844,8 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option) { struct trilogy_ctx *ctx = get_open_ctx(self); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_set_option_send(&ctx->conn, NUM2INT(option)); if (rc == TRILOGY_AGAIN) { @@ -687,6 +873,8 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option) } } + rb_trilogy_release_buffer(ctx); + return Qtrue; } @@ -892,6 +1080,10 @@ static VALUE execute_read_query_response(struct trilogy_ctx *ctx) handle_trilogy_error(ctx, args.rc, args.msg); } + if (!(ctx->conn.server_status & TRILOGY_SERVER_STATUS_MORE_RESULTS_EXISTS)) { + rb_trilogy_release_buffer(ctx); + } + return result; } @@ -924,6 +1116,8 @@ static VALUE rb_trilogy_query(VALUE self, VALUE query) StringValue(query); query = rb_str_export_to_enc(query, rb_to_encoding(ctx->encoding)); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_query_send(&ctx->conn, RSTRING_PTR(query), RSTRING_LEN(query)); if (rc == TRILOGY_AGAIN) { @@ -941,6 +1135,8 @@ static VALUE rb_trilogy_ping(VALUE self) { struct trilogy_ctx *ctx = get_open_ctx(self); + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_ping_send(&ctx->conn); if (rc == TRILOGY_AGAIN) { @@ -968,6 +1164,7 @@ static VALUE rb_trilogy_ping(VALUE self) } } + rb_trilogy_release_buffer(ctx); return Qtrue; } @@ -985,13 +1182,19 @@ static VALUE rb_trilogy_escape(VALUE self, VALUE str) const char *escaped_str; size_t escaped_len; + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_escape(&ctx->conn, RSTRING_PTR(str), RSTRING_LEN(str), &escaped_str, &escaped_len); if (rc < 0) { handle_trilogy_error(ctx, rc, "trilogy_escape"); } - return rb_enc_str_new(escaped_str, escaped_len, str_enc); + VALUE escaped_string = rb_enc_str_new(escaped_str, escaped_len, str_enc); + + rb_trilogy_release_buffer(ctx); + + return escaped_string; } static VALUE rb_trilogy_close(VALUE self) @@ -1002,6 +1205,8 @@ static VALUE rb_trilogy_close(VALUE self) return Qnil; } + rb_trilogy_acquire_buffer(ctx); + int rc = trilogy_close_send(&ctx->conn); if (rc == TRILOGY_AGAIN) { @@ -1027,6 +1232,8 @@ static VALUE rb_trilogy_close(VALUE self) // we must clear any SSL errors left in the queue from a read/write. ERR_clear_error(); + rb_trilogy_release_buffer(ctx); + trilogy_free(&ctx->conn); return Qnil; @@ -1132,15 +1339,34 @@ static VALUE rb_trilogy_server_status(VALUE self) { return LONG2FIX(get_open_ctx static VALUE rb_trilogy_server_version(VALUE self) { return rb_str_new_cstr(get_open_ctx(self)->server_version); } +static VALUE rb_trilogy_buffer_pool_size(VALUE klass) +{ + return UINT2NUM(buffer_pool_max_size); +} + +static VALUE rb_trilogy_buffer_pool_size_set(VALUE klass, VALUE size) +{ +#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY + RUBY_ATOMIC_SET(buffer_pool_max_size, NUM2UINT(size)); +#else + buffer_pool_max_size = NUM2UINT(size); +#endif + return size; +} + RUBY_FUNC_EXPORTED void Init_cext(void) { - #ifdef HAVE_RB_EXT_RACTOR_SAFE + #ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY rb_ext_ractor_safe(true); + buffer_pool_key = rb_ractor_local_storage_value_newkey(); #endif VALUE Trilogy = rb_const_get(rb_cObject, rb_intern("Trilogy")); rb_define_alloc_func(Trilogy, allocate_trilogy); + rb_define_singleton_method(Trilogy, "buffer_pool_size", rb_trilogy_buffer_pool_size, 0); + rb_define_singleton_method(Trilogy, "buffer_pool_size=", rb_trilogy_buffer_pool_size_set, 1); + rb_define_private_method(Trilogy, "_connect", rb_trilogy_connect, 3); rb_define_method(Trilogy, "change_db", rb_trilogy_change_db, 1); rb_define_alias(Trilogy, "select_db", "change_db"); diff --git a/contrib/ruby/ext/trilogy-ruby/extconf.rb b/contrib/ruby/ext/trilogy-ruby/extconf.rb index f618ef21..f96063bc 100644 --- a/contrib/ruby/ext/trilogy-ruby/extconf.rb +++ b/contrib/ruby/ext/trilogy-ruby/extconf.rb @@ -17,5 +17,5 @@ have_library("crypto", "CRYPTO_malloc") have_library("ssl", "SSL_new") have_func("rb_interned_str", "ruby.h") - +have_func("rb_ractor_local_storage_value_newkey", "ruby.h") create_makefile "trilogy/cext" diff --git a/contrib/ruby/test/client_test.rb b/contrib/ruby/test/client_test.rb index 6b968e98..382cda60 100644 --- a/contrib/ruby/test/client_test.rb +++ b/contrib/ruby/test/client_test.rb @@ -1131,4 +1131,12 @@ def test_error_classes_exclusively_match_subclasses assert_operator SystemCallError, :===, klass.new assert_operator Trilogy::ConnectionError, :===, klass.new end + + def test_buffer_pool_size_can_be_configured + assert_equal 8, Trilogy.buffer_pool_size + Trilogy.buffer_pool_size = 4 + assert_equal 4, Trilogy.buffer_pool_size + ensure + Trilogy.buffer_pool_size = 8 + end end diff --git a/inc/trilogy/client.h b/inc/trilogy/client.h index afce6273..e1e0568d 100644 --- a/inc/trilogy/client.h +++ b/inc/trilogy/client.h @@ -106,6 +106,15 @@ typedef struct { */ int trilogy_init(trilogy_conn_t *conn); +/* trilogy_init_no_buffer - Same as trilogy_init but doesn't allocate the packet buffer + * + * conn - A pre-allocated trilogy_conn_t pointer. + * + * Return values: + * TRILOGY_OK - The trilogy_conn_t pointer was properly initialized + */ +int trilogy_init_no_buffer(trilogy_conn_t *conn); + /* trilogy_flush_writes - Attempt to flush the internal packet buffer to the * network. This must be used if a `_send` function returns TRILOGY_AGAIN, and * should continue to be called until it returns a value other than diff --git a/inc/trilogy/error.h b/inc/trilogy/error.h index dd0be72e..59b65552 100644 --- a/inc/trilogy/error.h +++ b/inc/trilogy/error.h @@ -25,7 +25,8 @@ XX(TRILOGY_MAX_PACKET_EXCEEDED, -20) \ XX(TRILOGY_UNKNOWN_TYPE, -21) \ XX(TRILOGY_TIMEOUT, -22) \ - XX(TRILOGY_AUTH_PLUGIN_ERROR, -23) + XX(TRILOGY_AUTH_PLUGIN_ERROR, -23) \ + XX(TRILOGY_MEM_ERROR, -24) enum { #define XX(name, code) name = code, diff --git a/src/buffer.c b/src/buffer.c index a4c78393..250130cd 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -23,6 +23,9 @@ int trilogy_buffer_expand(trilogy_buffer_t *buffer, size_t needed) { // expand buffer if necessary if (buffer->len + needed > buffer->cap) { + if (buffer->buff == NULL) + return TRILOGY_MEM_ERROR; + size_t new_cap = buffer->cap; while (buffer->len + needed > new_cap) { @@ -59,7 +62,9 @@ int trilogy_buffer_putc(trilogy_buffer_t *buffer, uint8_t c) void trilogy_buffer_free(trilogy_buffer_t *buffer) { - free(buffer->buff); - buffer->buff = NULL; - buffer->len = buffer->cap = 0; + if (buffer->buff) { + free(buffer->buff); + buffer->buff = NULL; + buffer->len = buffer->cap = 0; + } } diff --git a/src/client.c b/src/client.c index fbd286c5..b05ace0c 100644 --- a/src/client.c +++ b/src/client.c @@ -117,10 +117,8 @@ static int begin_write(trilogy_conn_t *conn) return trilogy_flush_writes(conn); } -int trilogy_init(trilogy_conn_t *conn) +int trilogy_init_no_buffer(trilogy_conn_t *conn) { - int rc; - conn->affected_rows = 0; conn->last_insert_id = 0; conn->warning_count = 0; @@ -142,8 +140,14 @@ int trilogy_init(trilogy_conn_t *conn) trilogy_packet_parser_init(&conn->packet_parser, &packet_parser_callbacks); conn->packet_parser.user_data = &conn->packet_buffer; - CHECKED(trilogy_buffer_init(&conn->packet_buffer, TRILOGY_DEFAULT_BUF_SIZE)); + return TRILOGY_OK; +} +int trilogy_init(trilogy_conn_t *conn) +{ + int rc; + trilogy_init_no_buffer(conn); + CHECKED(trilogy_buffer_init(&conn->packet_buffer, TRILOGY_DEFAULT_BUF_SIZE)); return TRILOGY_OK; }