Skip to content

Commit

Permalink
Refactor InternalMetadata, MigrationContext to belong to the pool
Browse files Browse the repository at this point in the history
Extracted from: rails#50793

Similar to the recent refactoring of schema caches, rather than to directly
hold a connection, they now hold a pool and checkout a connection when needed.
  • Loading branch information
byroot committed Feb 22, 2024
1 parent 85c58ff commit a918394
Show file tree
Hide file tree
Showing 31 changed files with 376 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ def initialize(pool_config)
@reaper.run
end

def migration_context # :nodoc:
MigrationContext.new(migrations_paths, schema_migration, internal_metadata)
end

def migrations_paths # :nodoc:
db_config.migrations_paths || Migrator.migrations_paths
end

def schema_migration # :nodoc:
SchemaMigration.new(self)
end

def internal_metadata # :nodoc:
InternalMetadata.new(self)
end

# Retrieve the connection associated with the current thread, or call
# #checkout to obtain one if necessary.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def truncate(table_name, name = nil)
end

def truncate_tables(*table_names) # :nodoc:
table_names -= [schema_migration.table_name, internal_metadata.table_name]
table_names -= [pool.schema_migration.table_name, pool.internal_metadata.table_name]

return if table_names.empty?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ def check_constraint_exists?(table_name, **options)
end

def dump_schema_information # :nodoc:
versions = schema_migration.versions
versions = pool.schema_migration.versions
insert_versions_sql(versions) if versions.any?
end

Expand All @@ -1327,8 +1327,9 @@ def internal_string_options_for_primary_key # :nodoc:

def assume_migrated_upto_version(version)
version = version.to_i
sm_table = quote_table_name(schema_migration.table_name)
sm_table = quote_table_name(pool.schema_migration.table_name)

migration_context = pool.migration_context
migrated = migration_context.get_all_versions
versions = migration_context.migrations.map(&:version)

Expand Down Expand Up @@ -1838,7 +1839,7 @@ def remove_timestamps_for_alter(table_name, **options)
end

def insert_versions_sql(versions)
sm_table = quote_table_name(schema_migration.table_name)
sm_table = quote_table_name(pool.schema_migration.table_name)

if versions.is_a?(Array)
sql = +"INSERT INTO #{sm_table} (version) VALUES\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,6 @@ def replica?
@config[:replica] || false
end

def use_metadata_table?
@config.fetch(:use_metadata_table, true)
end

def connection_retries
(@config[:connection_retries] || 1).to_i
end
Expand Down Expand Up @@ -242,22 +238,6 @@ def preventing_writes?
connection_class.current_preventing_writes
end

def migrations_paths # :nodoc:
@config[:migrations_paths] || Migrator.migrations_paths
end

def migration_context # :nodoc:
MigrationContext.new(migrations_paths, schema_migration, internal_metadata)
end

def schema_migration # :nodoc:
SchemaMigration.new(self)
end

def internal_metadata # :nodoc:
InternalMetadata.new(self)
end

def prepared_statements?
@prepared_statements && !prepared_statements_disabled_cache.include?(object_id)
end
Expand Down Expand Up @@ -872,7 +852,7 @@ def check_version # :nodoc:
# numbered migration that has been executed, or 0 if no schema
# information is present / the database is empty.
def schema_version
migration_context.current_version
pool.migration_context.current_version
end

class << self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def for_current_env?
def schema_cache_path
raise NotImplementedError
end

def use_metadata_table?
raise NotImplementedError
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ def database_tasks? # :nodoc:
!replica? && !!configuration_hash.fetch(:database_tasks, true)
end

def use_metadata_table? # :nodoc:
configuration_hash.fetch(:use_metadata_table, true)
end

private
def schema_file_type(format)
case format
Expand Down
80 changes: 47 additions & 33 deletions activerecord/lib/active_record/internal_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ class InternalMetadata # :nodoc:
class NullInternalMetadata # :nodoc:
end

attr_reader :connection, :arel_table
attr_reader :arel_table

def initialize(connection)
@connection = connection
def initialize(pool)
@pool = pool
@arel_table = Arel::Table.new(table_name)
end

def enabled?
connection.use_metadata_table?
end

def primary_key
"key"
end
Expand All @@ -36,108 +32,126 @@ def table_name
"#{ActiveRecord::Base.table_name_prefix}#{ActiveRecord::Base.internal_metadata_table_name}#{ActiveRecord::Base.table_name_suffix}"
end

def enabled?
@pool.db_config.use_metadata_table?
end

def []=(key, value)
return unless enabled?

update_or_create_entry(key, value)
@pool.with_connection do |connection|
update_or_create_entry(connection, key, value)
end
end

def [](key)
return unless enabled?

if entry = select_entry(key)
entry[value_key]
@pool.with_connection do |connection|
if entry = select_entry(connection, key)
entry[value_key]
end
end
end

def delete_all_entries
dm = Arel::DeleteManager.new(arel_table)

connection.delete(dm, "#{self.class} Destroy")
@pool.with_connection do |connection|
connection.delete(dm, "#{self.class} Destroy")
end
end

def count
sm = Arel::SelectManager.new(arel_table)
sm.project(*Arel::Nodes::Count.new([Arel.star]))

connection.select_values(sm, "#{self.class} Count").first
@pool.with_connection do |connection|
connection.select_values(sm, "#{self.class} Count").first
end
end

def create_table_and_set_flags(environment, schema_sha1 = nil)
return unless enabled?

create_table
update_or_create_entry(:environment, environment)
update_or_create_entry(:schema_sha1, schema_sha1) if schema_sha1
@pool.with_connection do |connection|
create_table
update_or_create_entry(connection, :environment, environment)
update_or_create_entry(connection, :schema_sha1, schema_sha1) if schema_sha1
end
end

# Creates an internal metadata table with columns +key+ and +value+
def create_table
return unless enabled?

unless connection.table_exists?(table_name)
connection.create_table(table_name, id: false) do |t|
t.string :key, **connection.internal_string_options_for_primary_key
t.string :value
t.timestamps
@pool.with_connection do |connection|
unless connection.table_exists?(table_name)
connection.create_table(table_name, id: false) do |t|
t.string :key, **connection.internal_string_options_for_primary_key
t.string :value
t.timestamps
end
end
end
end

def drop_table
return unless enabled?

connection.drop_table table_name, if_exists: true
@pool.with_connection do |connection|
connection.drop_table table_name, if_exists: true
end
end

def table_exists?
@connection.schema_cache.data_source_exists?(table_name)
@pool.schema_cache.data_source_exists?(table_name)
end

private
def update_or_create_entry(key, value)
entry = select_entry(key)
def update_or_create_entry(connection, key, value)
entry = select_entry(connection, key)

if entry
if entry[value_key] != value
update_entry(key, value)
update_entry(connection, key, value)
else
entry[value_key]
end
else
create_entry(key, value)
create_entry(connection, key, value)
end
end

def current_time
def current_time(connection)
connection.default_timezone == :utc ? Time.now.utc : Time.now
end

def create_entry(key, value)
def create_entry(connection, key, value)
im = Arel::InsertManager.new(arel_table)
im.insert [
[arel_table[primary_key], key],
[arel_table[value_key], value],
[arel_table[:created_at], current_time],
[arel_table[:updated_at], current_time]
[arel_table[:created_at], current_time(connection)],
[arel_table[:updated_at], current_time(connection)]
]

connection.insert(im, "#{self.class} Create", primary_key, key)
end

def update_entry(key, new_value)
def update_entry(connection, key, new_value)
um = Arel::UpdateManager.new(arel_table)
um.set [
[arel_table[value_key], new_value],
[arel_table[:updated_at], current_time]
[arel_table[:updated_at], current_time(connection)]
]

um.where(arel_table[primary_key].eq(key))

connection.update(um, "#{self.class} Update")
end

def select_entry(key)
def select_entry(connection, key)
sm = Arel::SelectManager.new(arel_table)
sm.project(Arel::Nodes::SqlLiteral.new("*"))
sm.where(arel_table[primary_key].eq(Arel::Nodes::BindParam.new(key)))
Expand Down
Loading

0 comments on commit a918394

Please sign in to comment.