From 0e48f38c5da3e84d7bfa4ccf3f387dc07fde7237 Mon Sep 17 00:00:00 2001 From: Adrianna Chang Date: Thu, 4 Apr 2024 15:53:40 -0400 Subject: [PATCH] wip --- Gemfile.lock | 16 +++-- .../abstract/database_statements.rb | 66 ++++++++++++++++--- .../abstract/query_cache.rb | 4 +- .../lib/active_record/future_result.rb | 9 ++- .../test/cases/relation/load_async_test.rb | 3 +- 5 files changed, 78 insertions(+), 20 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index eef6c80576aa4..1b26b23b4435d 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -207,6 +207,7 @@ GEM rake (>= 12.0.0, < 14.0.0) drb (2.2.0) ruby2_keywords + error_highlight (0.6.0) erubi (1.12.0) et-orbi (1.2.7) tzinfo @@ -271,6 +272,8 @@ GEM googleauth (>= 0.16.2, < 2.a) mini_mime (~> 1.0) google-protobuf (3.25.1) + google-protobuf (3.25.1-x86_64-darwin) + google-protobuf (3.25.1-x86_64-linux) googleauth (1.9.1) faraday (>= 1.0, < 3.a) google-cloud-env (~> 2.1) @@ -482,13 +485,13 @@ GEM rubyzip (2.3.2) rufus-scheduler (3.9.1) fugit (~> 1.1, >= 1.1.6) - sass-embedded (1.69.6) - google-protobuf (~> 3.25) + sass-embedded (1.74.1) + google-protobuf (>= 3.25, < 5.0) rake (>= 13.0.0) - sass-embedded (1.69.6-x86_64-darwin) - google-protobuf (~> 3.25) - sass-embedded (1.69.6-x86_64-linux-gnu) - google-protobuf (~> 3.25) + sass-embedded (1.74.1-x86_64-darwin) + google-protobuf (>= 3.25, < 5.0) + sass-embedded (1.74.1-x86_64-linux-gnu) + google-protobuf (>= 3.25, < 5.0) selenium-webdriver (4.16.0) rexml (~> 3.2, >= 3.2.5) rubyzip (>= 1.2.2, < 3.0) @@ -606,6 +609,7 @@ DEPENDENCIES debug (>= 1.1.0) delayed_job delayed_job_active_record + error_highlight (>= 0.4.0) google-cloud-storage (~> 1.11) httpclient! image_processing (~> 1.2) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 9df12249499e4..c4f0b6cc661cb 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -144,8 +144,32 @@ def execute(sql, name = nil, allow_retry: false) # Note: the query is assumed to have side effects and the query cache # will be cleared. If the query is read-only, consider using #select_all # instead. - def exec_query(sql, name = "SQL", binds = [], prepare: false) - internal_exec_query(sql, name, binds, prepare: prepare) + def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false) + # require "debug"; debugger + if async && async_enabled? + if current_transaction.joinable? + raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions" + end + future_result = ActiveRecord::FutureResult::SelectAll.new( + pool, + sql, + name, + binds, + prepare: prepare, + ) + if supports_concurrent_connections? && current_transaction.closed? + future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session) + else + future_result.execute!(self) + end + return future_result + end + result = internal_exec_query(sql, name, binds, prepare: prepare, allow_retry: allow_retry) + if async + FutureResult.wrap(result) + else + result + end end # Executes insert +sql+ statement in the context of this connection using @@ -468,18 +492,42 @@ def insert_fixture(fixture, table_name) execute(build_fixture_sql(Array.wrap(fixture), table_name), "Fixture Insert") end - def insert_fixtures_set(fixture_set, tables_to_delete = []) - fixture_inserts = build_fixture_statements(fixture_set) - table_deletes = tables_to_delete.map { |table| "DELETE FROM #{quote_table_name(table)}" } - statements = table_deletes + fixture_inserts + # TO DO + # Parallelize fixture sets based on table + # Insert with async API, internal only + # exec_query(sql, async: true) + # fixture set: { table => [fixture_set]} + # should we still try to combine queries to reduce number of trips to db? + # Idea: combine delete + insert for same table into single statement + # no transactions for async, is this safe to remove? + def insert_fixtures_set(fixture_set, tables_to_delete = []) with_multi_statements do - transaction(requires_new: true) do - disable_referential_integrity do - execute_batch(statements, "Fixtures Load") + disable_referential_integrity do + fixture_set.each do |table_name, fixtures| + next if fixtures.empty? + + fixture_inserts = build_fixture_sql(fixtures, table_name) + # ONLY DELETE IF in tables_to_delete + statement = "DELETE FROM #{quote_table_name(table_name)};\n" + transform_query(fixture_inserts) + + exec_query(statement, "Fixture Load for #{table_name}", async: true) end end end + + # PREVIOUS IMPLEMENTATION + # fixture_inserts = build_fixture_statements(fixture_set) + # table_deletes = tables_to_delete.map { |table| "DELETE FROM #{quote_table_name(table)}" } + # statements = table_deletes + fixture_inserts + + # with_multi_statements do + # transaction(requires_new: true) do + # disable_referential_integrity do + # execute_batch(statements, "Fixtures Load") + # end + # end + # end end def empty_insert_statement_value(primary_key = nil) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 7f14408449c31..3f3133bfb0e31 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -186,8 +186,8 @@ def enable_query_cache! # # Set dirties: false to prevent query caches on all connections from being cleared by write operations. # (By default, write operations dirty all connections' query caches in case they are replicas whose cache would now be outdated.) - def uncached(dirties: true, &) - pool.disable_query_cache(dirties: dirties, &) + def uncached(dirties: true, &block) + pool.disable_query_cache(dirties: dirties, &block) end def disable_query_cache! diff --git a/activerecord/lib/active_record/future_result.rb b/activerecord/lib/active_record/future_result.rb index f55473ff2cd89..36eb56328fcfc 100644 --- a/activerecord/lib/active_record/future_result.rb +++ b/activerecord/lib/active_record/future_result.rb @@ -155,7 +155,9 @@ def execute_or_wait end def execute_query(connection, async: false) - @result = exec_query(connection, *@args, **@kwargs, async: async) + connection.send(:with_multi_statements) do + @result = exec_query(connection, *@args, **@kwargs, async: async) + end rescue => error @error = error ensure @@ -163,7 +165,10 @@ def execute_query(connection, async: false) end def exec_query(connection, *args, **kwargs) - connection.internal_exec_query(*args, **kwargs) + connection.send(:with_raw_connection) do |conn| + connection.internal_exec_query(*args, **kwargs) + conn.next_result while conn.more_results_exist? + end end class SelectAll < FutureResult # :nodoc: diff --git a/activerecord/test/cases/relation/load_async_test.rb b/activerecord/test/cases/relation/load_async_test.rb index 99eccfd1739a4..dd9da3fc242da 100644 --- a/activerecord/test/cases/relation/load_async_test.rb +++ b/activerecord/test/cases/relation/load_async_test.rb @@ -147,7 +147,8 @@ def test_simple_query end end - deferred_posts = Post.where(author_id: 1).load_async + deferred_posts = Post.lease_connection.exec_query("SELECT id FROM posts WHERE author_id = 1", async: true) + # deferred_posts = Post.where(author_id: 1).load_async wait_for_async_query assert_equal expected_records, deferred_posts.to_a