Skip to content

Commit

Permalink
Prototype loading schema concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
byroot committed Apr 16, 2024
1 parent 64fadc6 commit 1997678
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ def index_exists?(table_name, column_name, **options)
end

# Returns an array of +Column+ objects for the table specified by +table_name+.
def columns(table_name)
def columns(table_name, async: false)
table_name = table_name.to_s
definitions = column_definitions(table_name)
definitions.map do |field|
new_column_from_field(table_name, field, definitions)
result = column_definitions(table_name, async: async)
result.then do |definitions|
definitions.map do |field|
new_column_from_field(table_name, field, definitions)
end
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,10 +964,8 @@ def configure_connection
internal_execute("SET #{encoding} #{sql_mode_assignment} #{variable_assignments}")
end

def column_definitions(table_name) # :nodoc:
execute_and_free("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA") do |result|
each_hash(result)
end
def column_definitions(table_name, async: false) # :nodoc:
select_all("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA", async: async)
end

def create_table_info(table_name) # :nodoc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ def default_type(table_name, field_name)
end

def new_column_from_field(table_name, field, _definitions)
field_name = field.fetch(:Field)
type_metadata = fetch_type_metadata(field[:Type], field[:Extra])
default, default_function = field[:Default], nil
field_name = field.fetch("Field")
type_metadata = fetch_type_metadata(field["Type"], field["Extra"])
default, default_function = field["Default"], nil

if type_metadata.type == :datetime && /\ACURRENT_TIMESTAMP(?:\([0-6]?\))?\z/i.match?(default)
default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field[:Extra])
default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field["Extra"])
default, default_function = nil, default
elsif type_metadata.extra == "DEFAULT_GENERATED"
default = +"(#{default})" unless default.start_with?("(")
Expand All @@ -203,13 +203,13 @@ def new_column_from_field(table_name, field, _definitions)
end

MySQL::Column.new(
field[:Field],
field["Field"],
default,
type_metadata,
field[:Null] == "YES",
field["Null"] == "YES",
default_function,
collation: field[:Collation],
comment: field[:Comment].presence
collation: field["Collation"],
comment: field["Comment"].presence
)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,8 +1065,8 @@ def reconfigure_connection_timezone
# Query implementation notes:
# - format_type includes the column size constraint, e.g. varchar(50)
# - ::regclass is a function that gives the id for a table name
def column_definitions(table_name)
query(<<~SQL, "SCHEMA")
def column_definitions(table_name, async: false)
query(<<~SQL, "SCHEMA", async: async)
SELECT a.attname, format_type(a.atttypid, a.atttypmod),
pg_get_expr(d.adbin, d.adrelid), a.attnotnull, a.atttypid, a.atttypmod,
c.collname, col_description(a.attrelid, a.attnum) AS comment,
Expand Down
22 changes: 22 additions & 0 deletions activerecord/lib/active_record/connection_adapters/schema_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def load!(pool)
self
end

def preload_async(pool, sources)
cache(pool).preload_async(pool, sources)
end

def primary_keys(pool, table_name)
cache(pool).primary_keys(pool, table_name)
end
Expand Down Expand Up @@ -174,6 +178,10 @@ def cached?(table_name)
@schema_reflection.cached?(table_name)
end

def preload_async(sources)
@schema_reflection.preload_async(@pool, sources)
end

def primary_keys(table_name)
@schema_reflection.primary_keys(@pool, table_name)
end
Expand Down Expand Up @@ -334,6 +342,20 @@ def add(pool, table_name)
end
end

def preload_async(pool, sources)
columns = pool.with_connection do |connection|
sources.each_with_object({}) do |table_name, hash|
unless @columns.key?(table_name)
hash[table_name] = connection.columns(table_name, async: true)
end
end
end
columns.each do |table_name, column_info|
@columns[deep_deduplicate(table_name)] = deep_deduplicate(column_info.value)
rescue StatementInvalid
end
end

# Get the columns for a table
def columns(pool, table_name)
if ignored_table?(table_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,14 +475,17 @@ def bind_params_length
999
end

def table_structure(table_name)
structure = if supports_virtual_columns?
internal_exec_query("PRAGMA table_xinfo(#{quote_table_name(table_name)})", "SCHEMA")
def table_structure(table_name, async: false)
result = if supports_virtual_columns?
select_all("PRAGMA table_xinfo(#{quote_table_name(table_name)})", "SCHEMA", async: async)
else
internal_exec_query("PRAGMA table_info(#{quote_table_name(table_name)})", "SCHEMA")
select_all("PRAGMA table_info(#{quote_table_name(table_name)})", "SCHEMA", async: async)
end

result.then do |structure|
raise ActiveRecord::StatementInvalid.new("Could not find table '#{table_name}'", connection_pool: @pool) if structure.empty?
table_structure_with_collation(table_name, structure)
end
raise ActiveRecord::StatementInvalid.new("Could not find table '#{table_name}'", connection_pool: @pool) if structure.empty?
table_structure_with_collation(table_name, structure)
end
alias column_definitions table_structure

Expand Down
14 changes: 14 additions & 0 deletions activerecord/test/cases/connection_adapters/schema_cache_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ def load_bound_reflection(filename, pool = @pool)
end
end

def test_preload_async
cache = new_bound_reflection
assert_not cache.cached?("courses")

cache.preload_async(["courses", "omgponies", "professors", "colleges"])

assert cache.cached?("courses")
assert_not cache.cached?("omgponies")
assert cache.cached?("professors")
assert cache.cached?("colleges")

assert_equal ["id", "name", "college_id"], cache.columns("courses").map(&:name)
end

def test_cached?
cache = new_bound_reflection
assert_not cache.cached?("courses")
Expand Down

0 comments on commit 1997678

Please sign in to comment.