Skip to content

Commit

Permalink
Merge pull request #23 from doximity/jcw/fix_empty_toplevel_batch_bug
Browse files Browse the repository at this point in the history
AT-172 Fix empty toplevel batch bug and add step batch description
  • Loading branch information
jcwilk authored Aug 19, 2022
2 parents e21cd00 + 7bad907 commit 48b9137
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 34 deletions.
1 change: 1 addition & 0 deletions lib/simplekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "simplekiq/orchestration"
require "simplekiq/orchestration_job"
require "simplekiq/batching_job"
require "simplekiq/batch_tracker_job"

module Simplekiq
class << self
Expand Down
22 changes: 22 additions & 0 deletions lib/simplekiq/batch_tracker_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

# This job serves two purposes:
# * TODO: It provides a convenient way to track top-level orchestration batches
# * The top-level orchestration batch would otherwise be empty (aside from
# child-batches) and all sidekiq-pro batches must have at least 1 job

module Simplekiq
class BatchTrackerJob
include Sidekiq::Worker

def perform(klass_name, bid, args)
# In the future, this will likely surface the toplevel batch to a callback method on the
# orchestration job. We're holding off on this until we have time to design a comprehensive
# plan for providing simplekiq-wide instrumentation, ideally while being backwards compatible
# for in-flight orchestrations.

# For now, it's just satisfying the all-batches-must-have-jobs limitation in sidekiq-pro
# described at the head of the file.
end
end
end
37 changes: 21 additions & 16 deletions lib/simplekiq/orchestration_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,40 @@ def self.execute(args:, job:, workflow:)
orchestration_batch.description = "#{job.class.name} Simplekiq orchestration"
Simplekiq.auto_define_callbacks(orchestration_batch, args: args, job: job)

new.run_step(orchestration_batch, workflow, 0)
orchestration_batch.jobs do
Simplekiq::BatchTrackerJob.perform_async(job.class.name, orchestration_batch.bid, args)

new.run_step(workflow, 0)
end
end

def run_step(orchestration_batch, workflow, step)
def run_step(workflow, step)
*jobs = workflow.at(step)
# This will never be empty because Orchestration#serialized_workflow skips inserting
# a new step for in_parallel if there were no inner jobs specified.

orchestration_batch.jobs do
step_batch = Sidekiq::Batch.new
step_batch.on(
"success",
self.class,
{"orchestration_workflow" => workflow, "step" => step + 1}
)

step_batch.jobs do
jobs.each do |job|
Object.const_get(job["klass"]).perform_async(*job["args"])
end
next_step = step + 1
step_batch = Sidekiq::Batch.new
step_batch.description = "Simplekiq orchestrated step #{next_step}"
step_batch.on(
"success",
self.class,
{"orchestration_workflow" => workflow, "step" => next_step}
)

step_batch.jobs do
jobs.each do |job|
Object.const_get(job["klass"]).perform_async(*job["args"])
end
end
end

def on_success(status, options)
return if options["step"] == options["orchestration_workflow"].length

orchestration_batch = Sidekiq::Batch.new(status.parent_bid)
run_step(orchestration_batch, options["orchestration_workflow"], options["step"])
Sidekiq::Batch.new(status.parent_bid).jobs do
run_step(options["orchestration_workflow"], options["step"])
end
end
end
end
42 changes: 24 additions & 18 deletions spec/orchestration_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,29 @@ def execute
described_class.execute(args: ["some", "args"], job: job, workflow: workflow)
end

it "kicks off the first step with a new batch" do
batch_double = instance_double(Sidekiq::Batch)
it "kicks off the first step with a new batch with the empty tracking batch inside it" do
batch_double = instance_double(Sidekiq::Batch, bid: 42)
allow(Sidekiq::Batch).to receive(:new).and_return(batch_double)
expect(batch_double).to receive(:description=).with("FakeOrchestration Simplekiq orchestration")
expect(batch_double).to receive(:on).with("success", FakeOrchestration, "args" => ["some", "args"])

batch_stack_depth = 0 # to keep track of how deeply nested within batches we are
expect(batch_double).to receive(:jobs) do |&block|
batch_stack_depth += 1
block.call
batch_stack_depth -= 1
end

expect(Simplekiq::BatchTrackerJob).to receive(:perform_async) do
expect(batch_stack_depth).to eq 1
end

instance = instance_double(Simplekiq::OrchestrationExecutor)
allow(Simplekiq::OrchestrationExecutor).to receive(:new).and_return(instance)
expect(instance).to receive(:run_step).with(batch_double, workflow, 0)
expect(instance).to receive(:run_step) do |workflow_arg, step|
expect(batch_stack_depth).to eq 1
expect(step).to eq 0
end

execute
end
Expand All @@ -55,29 +69,20 @@ def execute
end

describe "run_step" do
let(:orchestration_batch) { instance_double(Sidekiq::Batch) }
let(:step_batch) { instance_double(Sidekiq::Batch) }
let(:step) { 0 }
let(:instance) { described_class.new }

it "runs the next job within a new step batch which is within the orchestration batch" do
batch_stack = [] # to keep track of how deeply nested within batches we are
expect(orchestration_batch).to receive(:jobs) do |&block|
expect(batch_stack).to be_empty

batch_stack.push("orchestration")
block.call
batch_stack.shift
end
it "runs the next job within a new step batch" do
batch_stack_depth = 0 # to keep track of how deeply nested within batches we are
expect(step_batch).to receive(:jobs) do |&block|
expect(batch_stack).to eq ["orchestration"]
batch_stack.push("step")
batch_stack_depth += 1
block.call
batch_stack.shift
batch_stack_depth -= 1
end

expect(OrcTest::JobA).to receive(:perform_async) do |arg|
expect(batch_stack).to eq ["orchestration", "step"]
expect(batch_stack_depth).to eq 1
expect(arg).to eq 1
end

Expand All @@ -86,8 +91,9 @@ def execute
"orchestration_workflow" => workflow,
"step" => 1
})
expect(step_batch).to receive(:description=).with("Simplekiq orchestrated step 1")

instance.run_step(orchestration_batch, workflow, 0)
instance.run_step(workflow, 0)
end
end
end

0 comments on commit 48b9137

Please sign in to comment.