Skip to content

Commit

Permalink
Merge pull request #1591 from broadinstitute/jb-de-service-bugfix
Browse files Browse the repository at this point in the history
Removing `skip_push` parameter for `IngestJob.push_remote_and_launch_ingest` (SCP-4576)
  • Loading branch information
bistline authored Aug 15, 2022
2 parents b15a57c + 357d0bf commit 8610bbb
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 27 deletions.
33 changes: 27 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ GEM
crass (1.0.6)
daemons (1.4.0)
declarative (0.0.20)
delayed_job (4.1.9)
activesupport (>= 3.0, < 6.2)
delayed_job (4.1.10)
activesupport (>= 3.0, < 8.0)
delayed_job_mongoid (2.3.1)
delayed_job (>= 3.0, < 5)
mongoid (>= 3.0, < 8)
Expand All @@ -157,8 +157,29 @@ GEM
factory_bot_rails (6.1.0)
factory_bot (~> 6.1.0)
railties (>= 5.0.0)
faraday (0.17.5)
multipart-post (>= 1.2, < 3)
faraday (1.10.1)
faraday-em_http (~> 1.0)
faraday-em_synchrony (~> 1.0)
faraday-excon (~> 1.1)
faraday-httpclient (~> 1.0)
faraday-multipart (~> 1.0)
faraday-net_http (~> 1.0)
faraday-net_http_persistent (~> 1.0)
faraday-patron (~> 1.0)
faraday-rack (~> 1.0)
faraday-retry (~> 1.0)
ruby2_keywords (>= 0.0.4)
faraday-em_http (1.0.0)
faraday-em_synchrony (1.0.0)
faraday-excon (1.1.0)
faraday-httpclient (1.0.1)
faraday-multipart (1.0.4)
multipart-post (~> 2)
faraday-net_http (1.0.1)
faraday-net_http_persistent (1.2.0)
faraday-patron (1.0.0)
faraday-rack (1.0.0)
faraday-retry (1.0.3)
ffi (1.15.5)
flamegraph (0.9.5)
gems (1.2.0)
Expand Down Expand Up @@ -447,8 +468,8 @@ GEM
sdoc (2.1.0)
rdoc (>= 5.0)
secure_headers (6.3.2)
sentry-raven (2.13.0)
faraday (>= 0.7.6, < 1.0)
sentry-raven (3.1.2)
faraday (>= 1.0)
signet (0.17.0)
addressable (~> 2.8)
faraday (>= 0.17.5, < 3.a)
Expand Down
4 changes: 2 additions & 2 deletions app/lib/differential_expression_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def self.run_differential_expression_job(cluster_file, study, user, annotation_n
# launch DE job
job = IngestJob.new(study: study, study_file: cluster_file, user: user, action: :differential_expression,
params_object: params_object)
job.delay.push_remote_and_launch_ingest(skip_push: true) # skip push as file is already in bucket
job.delay.push_remote_and_launch_ingest
true
else
raise ArgumentError, "job parameters failed to validate: #{params_object.errors.full_messages}"
Expand All @@ -198,7 +198,7 @@ def self.run_differential_expression_job(cluster_file, study, user, annotation_n
def self.validate_annotation(cluster_file, study, annotation_name, annotation_scope)
cluster = study.cluster_groups.by_name(cluster_file.name)
raise ArgumentError, "cannot find cluster for #{cluster_file.name}" if cluster.nil?

result = DifferentialExpressionResult.find_by(study: study,
cluster_group: cluster,
annotation_name: annotation_name,
Expand Down
2 changes: 1 addition & 1 deletion app/lib/file_parse_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def self.run_parse_job(study_file, study, user, reparse: false, persist_on_fail:
bundle.study_files.update_all(parse_status: 'parsing')
job = IngestJob.new(study: study, study_file: matrix, user: user, action: :ingest_expression, reparse: reparse,
persist_on_fail: persist_on_fail)
job.delay.push_remote_and_launch_ingest(skip_push: true)
job.delay.push_remote_and_launch_ingest
else
return self.missing_bundled_file(study_file)
end
Expand Down
17 changes: 5 additions & 12 deletions app/models/ingest_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ class IngestJob
# Push a file to a workspace bucket in the background and then launch an ingest run and queue polling
# Can also clear out existing data if necessary (in case of a re-parse)
#
# * *params*
# - +skip_push+ (Boolean) => skip call to study.send_to_firecloud(study_file) (may be in process in different thread)
# * *yields*
# - (Google::Apis::GenomicsV2alpha1::Operation) => Will submit an ingest job in PAPI
# - (IngestJob.new(attributes).poll_for_completion) => Will queue a Delayed::Job to poll for completion
#
# * *raises*
# - (RuntimeError) => If file cannot be pushed to remote bucket
def push_remote_and_launch_ingest(skip_push: false)
def push_remote_and_launch_ingest
begin
file_identifier = "#{study_file.bucket_location}:#{study_file.id}"
rails_model = MODELS_BY_ACTION[action]
Expand All @@ -70,7 +68,7 @@ def push_remote_and_launch_ingest(skip_push: false)
# first check if file is already in bucket (in case user is syncing)
remote = ApplicationController.firecloud_client.get_workspace_file(study.bucket_id, study_file.bucket_location)
if remote.nil?
is_pushed = poll_for_remote(skip_push: skip_push)
is_pushed = poll_for_remote
else
is_pushed = true # file is already in bucket
end
Expand Down Expand Up @@ -110,20 +108,15 @@ def push_remote_and_launch_ingest(skip_push: false)

# helper method to push & poll for remote file
#
# * *params*
# - +skip_push+ (Boolean) => skip call to study.send_to_firecloud(study_file) (may be in process in different thread)
#
# * *returns*
# - (Boolean) => Indication of whether or not file has reached bucket
def poll_for_remote(skip_push: false)
def poll_for_remote
attempts = 1
is_pushed = false
file_identifier = "#{study_file.bucket_location}:#{study_file.id}"
while !is_pushed && attempts <= MAX_ATTEMPTS
unless skip_push
Rails.logger.info "Preparing to push #{file_identifier} to #{study.bucket_id}"
study.send_to_firecloud(study_file)
end
Rails.logger.info "Preparing to push #{file_identifier} to #{study.bucket_id}"
study.send_to_firecloud(study_file)
Rails.logger.info "Polling for upload of #{file_identifier}, attempt #{attempts}"
remote = ApplicationController.firecloud_client.get_workspace_file(study.bucket_id, study_file.bucket_location)
if remote.present?
Expand Down
12 changes: 6 additions & 6 deletions test/integration/lib/differential_expression_service_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DifferentialExpressionServiceTest < ActiveSupport::TestCase

# we need to mock 2 levels deep as :delay should yield the :push_remote_and_launch_ingest mock
job_mock = Minitest::Mock.new
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new, [Hash])
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new)
mock = Minitest::Mock.new
mock.expect(:delay, job_mock)
IngestJob.stub :new, mock do
Expand Down Expand Up @@ -155,7 +155,7 @@ class DifferentialExpressionServiceTest < ActiveSupport::TestCase
DataArray.create(data_array_params)

job_mock = Minitest::Mock.new
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new, [Hash])
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new)
mock = Minitest::Mock.new
mock.expect(:delay, job_mock)
IngestJob.stub :new, mock do
Expand All @@ -178,7 +178,7 @@ class DifferentialExpressionServiceTest < ActiveSupport::TestCase
@basic_study.update(default_options: { cluster: 'cluster_diffexp.txt', annotation: 'species--group--study' })
DataArray.create!(@all_cells_array_params)
job_mock = Minitest::Mock.new
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new, [Hash])
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new)
mock = Minitest::Mock.new
mock.expect(:delay, job_mock)
IngestJob.stub :new, mock do
Expand All @@ -205,9 +205,9 @@ class DifferentialExpressionServiceTest < ActiveSupport::TestCase
test 'should run differential expression job on all annotations' do
DataArray.create!(@all_cells_array_params)
job_mock = Minitest::Mock.new
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new, [Hash])
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new, [Hash])
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new, [Hash])
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new)
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new)
job_mock.expect(:push_remote_and_launch_ingest, Delayed::Job.new)
mock = Minitest::Mock.new
mock.expect(:delay, job_mock)
mock.expect(:delay, job_mock)
Expand Down

0 comments on commit 8610bbb

Please sign in to comment.