From 1997678beadf07a9abe7f8268d27407c0bc3db6d Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Tue, 16 Apr 2024 16:49:42 +0200 Subject: [PATCH] Prototype loading schema concurrently --- .../abstract/schema_statements.rb | 10 +++++---- .../abstract_mysql_adapter.rb | 6 ++--- .../mysql/schema_statements.rb | 16 +++++++------- .../connection_adapters/postgresql_adapter.rb | 4 ++-- .../connection_adapters/schema_cache.rb | 22 +++++++++++++++++++ .../connection_adapters/sqlite3_adapter.rb | 15 ++++++++----- .../connection_adapters/schema_cache_test.rb | 14 ++++++++++++ 7 files changed, 63 insertions(+), 24 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb index ec5e668748758..63d663611236d 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb @@ -104,11 +104,13 @@ def index_exists?(table_name, column_name, **options) end # Returns an array of +Column+ objects for the table specified by +table_name+. - def columns(table_name) + def columns(table_name, async: false) table_name = table_name.to_s - definitions = column_definitions(table_name) - definitions.map do |field| - new_column_from_field(table_name, field, definitions) + result = column_definitions(table_name, async: async) + result.then do |definitions| + definitions.map do |field| + new_column_from_field(table_name, field, definitions) + end end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index 8d1e6318bf28f..2a45f698a918f 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -964,10 +964,8 @@ def configure_connection internal_execute("SET #{encoding} #{sql_mode_assignment} #{variable_assignments}") end - def column_definitions(table_name) # :nodoc: - execute_and_free("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA") do |result| - each_hash(result) - end + def column_definitions(table_name, async: false) # :nodoc: + select_all("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA", async: async) end def create_table_info(table_name) # :nodoc: diff --git a/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb index 40deb2bf198b3..d4b12e78d93d7 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb @@ -182,12 +182,12 @@ def default_type(table_name, field_name) end def new_column_from_field(table_name, field, _definitions) - field_name = field.fetch(:Field) - type_metadata = fetch_type_metadata(field[:Type], field[:Extra]) - default, default_function = field[:Default], nil + field_name = field.fetch("Field") + type_metadata = fetch_type_metadata(field["Type"], field["Extra"]) + default, default_function = field["Default"], nil if type_metadata.type == :datetime && /\ACURRENT_TIMESTAMP(?:\([0-6]?\))?\z/i.match?(default) - default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field[:Extra]) + default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field["Extra"]) default, default_function = nil, default elsif type_metadata.extra == "DEFAULT_GENERATED" default = +"(#{default})" unless default.start_with?("(") @@ -203,13 +203,13 @@ def new_column_from_field(table_name, field, _definitions) end MySQL::Column.new( - field[:Field], + field["Field"], default, type_metadata, - field[:Null] == "YES", + field["Null"] == "YES", default_function, - collation: field[:Collation], - comment: field[:Comment].presence + collation: field["Collation"], + comment: field["Comment"].presence ) end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index f1962ff494095..23962069d4109 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -1065,8 +1065,8 @@ def reconfigure_connection_timezone # Query implementation notes: # - format_type includes the column size constraint, e.g. varchar(50) # - ::regclass is a function that gives the id for a table name - def column_definitions(table_name) - query(<<~SQL, "SCHEMA") + def column_definitions(table_name, async: false) + query(<<~SQL, "SCHEMA", async: async) SELECT a.attname, format_type(a.atttypid, a.atttypmod), pg_get_expr(d.adbin, d.adrelid), a.attnotnull, a.atttypid, a.atttypmod, c.collname, col_description(a.attrelid, a.attnum) AS comment, diff --git a/activerecord/lib/active_record/connection_adapters/schema_cache.rb b/activerecord/lib/active_record/connection_adapters/schema_cache.rb index 184037ab25ce5..7ee532004a85c 100644 --- a/activerecord/lib/active_record/connection_adapters/schema_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/schema_cache.rb @@ -30,6 +30,10 @@ def load!(pool) self end + def preload_async(pool, sources) + cache(pool).preload_async(pool, sources) + end + def primary_keys(pool, table_name) cache(pool).primary_keys(pool, table_name) end @@ -174,6 +178,10 @@ def cached?(table_name) @schema_reflection.cached?(table_name) end + def preload_async(sources) + @schema_reflection.preload_async(@pool, sources) + end + def primary_keys(table_name) @schema_reflection.primary_keys(@pool, table_name) end @@ -334,6 +342,20 @@ def add(pool, table_name) end end + def preload_async(pool, sources) + columns = pool.with_connection do |connection| + sources.each_with_object({}) do |table_name, hash| + unless @columns.key?(table_name) + hash[table_name] = connection.columns(table_name, async: true) + end + end + end + columns.each do |table_name, column_info| + @columns[deep_deduplicate(table_name)] = deep_deduplicate(column_info.value) + rescue StatementInvalid + end + end + # Get the columns for a table def columns(pool, table_name) if ignored_table?(table_name) diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb index bb689eaf83309..c2e3269ac06d0 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb @@ -475,14 +475,17 @@ def bind_params_length 999 end - def table_structure(table_name) - structure = if supports_virtual_columns? - internal_exec_query("PRAGMA table_xinfo(#{quote_table_name(table_name)})", "SCHEMA") + def table_structure(table_name, async: false) + result = if supports_virtual_columns? + select_all("PRAGMA table_xinfo(#{quote_table_name(table_name)})", "SCHEMA", async: async) else - internal_exec_query("PRAGMA table_info(#{quote_table_name(table_name)})", "SCHEMA") + select_all("PRAGMA table_info(#{quote_table_name(table_name)})", "SCHEMA", async: async) + end + + result.then do |structure| + raise ActiveRecord::StatementInvalid.new("Could not find table '#{table_name}'", connection_pool: @pool) if structure.empty? + table_structure_with_collation(table_name, structure) end - raise ActiveRecord::StatementInvalid.new("Could not find table '#{table_name}'", connection_pool: @pool) if structure.empty? - table_structure_with_collation(table_name, structure) end alias column_definitions table_structure diff --git a/activerecord/test/cases/connection_adapters/schema_cache_test.rb b/activerecord/test/cases/connection_adapters/schema_cache_test.rb index f816d9e9f8b45..375897d4b46d9 100644 --- a/activerecord/test/cases/connection_adapters/schema_cache_test.rb +++ b/activerecord/test/cases/connection_adapters/schema_cache_test.rb @@ -28,6 +28,20 @@ def load_bound_reflection(filename, pool = @pool) end end + def test_preload_async + cache = new_bound_reflection + assert_not cache.cached?("courses") + + cache.preload_async(["courses", "omgponies", "professors", "colleges"]) + + assert cache.cached?("courses") + assert_not cache.cached?("omgponies") + assert cache.cached?("professors") + assert cache.cached?("colleges") + + assert_equal ["id", "name", "college_id"], cache.columns("courses").map(&:name) + end + def test_cached? cache = new_bound_reflection assert_not cache.cached?("courses")