Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianna-chang-shopify committed Apr 17, 2024
1 parent 1997678 commit 0e48f38
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 20 deletions.
16 changes: 10 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ GEM
rake (>= 12.0.0, < 14.0.0)
drb (2.2.0)
ruby2_keywords
error_highlight (0.6.0)
erubi (1.12.0)
et-orbi (1.2.7)
tzinfo
Expand Down Expand Up @@ -271,6 +272,8 @@ GEM
googleauth (>= 0.16.2, < 2.a)
mini_mime (~> 1.0)
google-protobuf (3.25.1)
google-protobuf (3.25.1-x86_64-darwin)
google-protobuf (3.25.1-x86_64-linux)
googleauth (1.9.1)
faraday (>= 1.0, < 3.a)
google-cloud-env (~> 2.1)
Expand Down Expand Up @@ -482,13 +485,13 @@ GEM
rubyzip (2.3.2)
rufus-scheduler (3.9.1)
fugit (~> 1.1, >= 1.1.6)
sass-embedded (1.69.6)
google-protobuf (~> 3.25)
sass-embedded (1.74.1)
google-protobuf (>= 3.25, < 5.0)
rake (>= 13.0.0)
sass-embedded (1.69.6-x86_64-darwin)
google-protobuf (~> 3.25)
sass-embedded (1.69.6-x86_64-linux-gnu)
google-protobuf (~> 3.25)
sass-embedded (1.74.1-x86_64-darwin)
google-protobuf (>= 3.25, < 5.0)
sass-embedded (1.74.1-x86_64-linux-gnu)
google-protobuf (>= 3.25, < 5.0)
selenium-webdriver (4.16.0)
rexml (~> 3.2, >= 3.2.5)
rubyzip (>= 1.2.2, < 3.0)
Expand Down Expand Up @@ -606,6 +609,7 @@ DEPENDENCIES
debug (>= 1.1.0)
delayed_job
delayed_job_active_record
error_highlight (>= 0.4.0)
google-cloud-storage (~> 1.11)
httpclient!
image_processing (~> 1.2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,32 @@ def execute(sql, name = nil, allow_retry: false)
# Note: the query is assumed to have side effects and the query cache
# will be cleared. If the query is read-only, consider using #select_all
# instead.
def exec_query(sql, name = "SQL", binds = [], prepare: false)
internal_exec_query(sql, name, binds, prepare: prepare)
def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false)
# require "debug"; debugger
if async && async_enabled?
if current_transaction.joinable?
raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions"
end
future_result = ActiveRecord::FutureResult::SelectAll.new(
pool,
sql,
name,
binds,
prepare: prepare,
)
if supports_concurrent_connections? && current_transaction.closed?
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
else
future_result.execute!(self)
end
return future_result
end
result = internal_exec_query(sql, name, binds, prepare: prepare, allow_retry: allow_retry)
if async
FutureResult.wrap(result)
else
result
end
end

# Executes insert +sql+ statement in the context of this connection using
Expand Down Expand Up @@ -468,18 +492,42 @@ def insert_fixture(fixture, table_name)
execute(build_fixture_sql(Array.wrap(fixture), table_name), "Fixture Insert")
end

def insert_fixtures_set(fixture_set, tables_to_delete = [])
fixture_inserts = build_fixture_statements(fixture_set)
table_deletes = tables_to_delete.map { |table| "DELETE FROM #{quote_table_name(table)}" }
statements = table_deletes + fixture_inserts
# TO DO
# Parallelize fixture sets based on table
# Insert with async API, internal only
# exec_query(sql, async: true)
# fixture set: { table => [fixture_set]}
# should we still try to combine queries to reduce number of trips to db?
# Idea: combine delete + insert for same table into single statement
# no transactions for async, is this safe to remove?

def insert_fixtures_set(fixture_set, tables_to_delete = [])
with_multi_statements do
transaction(requires_new: true) do
disable_referential_integrity do
execute_batch(statements, "Fixtures Load")
disable_referential_integrity do
fixture_set.each do |table_name, fixtures|
next if fixtures.empty?

fixture_inserts = build_fixture_sql(fixtures, table_name)
# ONLY DELETE IF in tables_to_delete
statement = "DELETE FROM #{quote_table_name(table_name)};\n" + transform_query(fixture_inserts)

exec_query(statement, "Fixture Load for #{table_name}", async: true)
end
end
end

# PREVIOUS IMPLEMENTATION
# fixture_inserts = build_fixture_statements(fixture_set)
# table_deletes = tables_to_delete.map { |table| "DELETE FROM #{quote_table_name(table)}" }
# statements = table_deletes + fixture_inserts

# with_multi_statements do
# transaction(requires_new: true) do
# disable_referential_integrity do
# execute_batch(statements, "Fixtures Load")
# end
# end
# end
end

def empty_insert_statement_value(primary_key = nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ def enable_query_cache!
#
# Set <tt>dirties: false</tt> to prevent query caches on all connections from being cleared by write operations.
# (By default, write operations dirty all connections' query caches in case they are replicas whose cache would now be outdated.)
def uncached(dirties: true, &)
pool.disable_query_cache(dirties: dirties, &)
def uncached(dirties: true, &block)
pool.disable_query_cache(dirties: dirties, &block)
end

def disable_query_cache!
Expand Down
9 changes: 7 additions & 2 deletions activerecord/lib/active_record/future_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,20 @@ def execute_or_wait
end

def execute_query(connection, async: false)
@result = exec_query(connection, *@args, **@kwargs, async: async)
connection.send(:with_multi_statements) do
@result = exec_query(connection, *@args, **@kwargs, async: async)
end
rescue => error
@error = error
ensure
@pending = false
end

def exec_query(connection, *args, **kwargs)
connection.internal_exec_query(*args, **kwargs)
connection.send(:with_raw_connection) do |conn|
connection.internal_exec_query(*args, **kwargs)
conn.next_result while conn.more_results_exist?
end
end

class SelectAll < FutureResult # :nodoc:
Expand Down
3 changes: 2 additions & 1 deletion activerecord/test/cases/relation/load_async_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def test_simple_query
end
end

deferred_posts = Post.where(author_id: 1).load_async
deferred_posts = Post.lease_connection.exec_query("SELECT id FROM posts WHERE author_id = 1", async: true)
# deferred_posts = Post.where(author_id: 1).load_async
wait_for_async_query

assert_equal expected_records, deferred_posts.to_a
Expand Down

0 comments on commit 0e48f38

Please sign in to comment.