Skip to content

Commit

Permalink
Merge pull request #47 from OSC/pbspro-info-where-owner
Browse files Browse the repository at this point in the history
Add optimized Adapter#info_where_owner code to PBS Pro adapter
  • Loading branch information
ericfranz authored Jul 5, 2017
2 parents 706f307 + 3cd5e65 commit 4f3bce5
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 2 deletions.
43 changes: 42 additions & 1 deletion lib/ood_core/job/adapters/pbspro.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,23 @@ class Factory
# @param config [#to_h] the configuration for job adapter
# @option config [Object] :host (nil) The batch server host
# @option config [Object] :exec (nil) Path to PBS Pro executables
# @option config [Object] :qstat_factor (nil) Deciding factor on how to
# call qstat for a user
def self.build_pbspro(config)
c = config.to_h.compact.symbolize_keys
host = c.fetch(:host, nil)
exec = c.fetch(:exec, nil)
qstat_factor = c.fetch(:qstat_factor, nil)
pbspro = Adapters::PBSPro::Batch.new(host: host, exec: exec)
Adapters::PBSPro.new(pbspro: pbspro)
Adapters::PBSPro.new(pbspro: pbspro, qstat_factor: qstat_factor)
end
end

module Adapters
# An adapter object that describes the communication with a PBS Pro
# resource manager for job management.
class PBSPro < Adapter
using Refinements::ArrayExtensions
using Refinements::HashExtensions

# Object used for simplified communication with a PBS Pro batch server
Expand Down Expand Up @@ -90,6 +94,15 @@ def get_jobs(id: "")
jobs.reject { |j| /\[\]/ =~ j[:job_id] } # drop main job array jobs
end

# Select batch jobs from the batch server
# @param args [Array<#to_s>] arguments passed to `qselect` command
# @raise [Error] if `qselect` command exited unsuccessfully
# @return [Array<String>] list of job ids that match selection
# criteria
def select_jobs(args: [])
call("qselect", *args).split("\n").map(&:strip)
end

# Put a specified job on hold
# @example Put job "1234" on hold
# my_batch.hold_job("1234")
Expand Down Expand Up @@ -161,14 +174,22 @@ def call(cmd, *args, env: {}, stdin: "", chdir: nil)
# ignore B as it signifies a job array
}

# What percentage of jobs a user owns out of all jobs, used to decide
# whether we filter the owner's jobs from a `qstat` of all jobs or call
# `qstat` on each of the owner's individual jobs
# @return [Float] ratio of owner's jobs to all jobs
attr_reader :qstat_factor

# @api private
# @param opts [#to_h] the options defining this adapter
# @option opts [Batch] :pbspro The PBS Pro batch object
# @option opts [#to_f] :qstat_factor (0.10) The qstat deciding factor
# @see Factory.build_pbspro
def initialize(opts = {})
o = opts.to_h.compact.symbolize_keys

@pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" }
@qstat_factor = o.fetch(:qstat_factor, 0.10).to_f
end

# Submit a job with the attributes defined in the job template instance
Expand Down Expand Up @@ -254,6 +275,26 @@ def info_all
raise JobAdapterError, e.message
end

# Retrieve info for all jobs for a given owner or owners from the
# resource manager
# @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
def info_where_owner(owner)
owner = Array.wrap(owner).map(&:to_s)

usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")])
all_jobs = @pbspro.select_jobs(args: ["-T"])

# `qstat` all jobs if user has too many jobs, otherwise `qstat` each
# individual job (default factor is 10%)
if usr_jobs.size > (qstat_factor * all_jobs.size)
super
else
usr_jobs.map { |id| info(id) }
end
end

# Retrieve job info from the resource manager
# @param id [#to_s] the id of the job
# @raise [JobAdapterError] if something goes wrong getting job info
Expand Down
103 changes: 102 additions & 1 deletion spec/job/adapters/pbspro_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
describe OodCore::Job::Adapters::PBSPro do
# Required arguments
let(:pbspro) { double() }
let(:qstat_factor) { nil }

# Subject
subject(:adapter) { described_class.new(pbspro: pbspro) }
subject(:adapter) { described_class.new(pbspro: pbspro, qstat_factor: qstat_factor) }

it { is_expected.to respond_to(:submit).with(1).argument.and_keywords(:after, :afterok, :afternotok, :afterany) }
it { is_expected.to respond_to(:info_all).with(0).arguments }
it { is_expected.to respond_to(:info_where_owner).with(1).argument }
it { is_expected.to respond_to(:info).with(1).argument }
it { is_expected.to respond_to(:status).with(1).argument }
it { is_expected.to respond_to(:hold).with(1).argument }
Expand Down Expand Up @@ -244,6 +246,105 @@ def build_script(opts = {})
end
end

describe "#info_where_owner" do
let(:job_owner) { "job_owner" }
let(:pbspro) { double(select_jobs: job_ids) }
subject { adapter.info_where_owner(job_owner) }

context "owner has no jobs" do
let(:job_ids) { [] }

it { is_expected.to eq([]) }
end

context "when given list of owners" do
let(:job_ids) { [] }
let(:job_owner) { ["job_owner_1", "job_owner_2"] }

it "uses comma delimited owner list" do
expect(pbspro).to receive(:select_jobs).with(args: ["-u", job_owner.join(",")])
is_expected.to eq([])
end
end

context "when owner has multiple jobs" do
let(:qstat_factor) { 1.00 }
let(:job_ids) { [ "job_id_1", "job_id_2" ] }
let(:job_hash_1) {
{
:job_id=>"job_id_1",
:Job_Owner=>"#{job_owner}@server",
:job_state=>"Q",
}
}
let(:job_hash_2) {
{
:job_id=>"job_id_2",
:Job_Owner=>"#{job_owner}@server",
:job_state=>"Q",
}
}

before do
allow(pbspro).to receive(:get_jobs).with(id: "job_id_1").and_return([job_hash_1])
allow(pbspro).to receive(:get_jobs).with(id: "job_id_2").and_return([job_hash_2])
end

it "returns list of OodCore::Job::Info objects" do
is_expected.to eq([
OodCore::Job::Info.new(
:id=>"job_id_1",
:job_owner=>job_owner,
:submit_host=>"server",
:status=>:queued,
:procs=>0,
:native=>job_hash_1
),
OodCore::Job::Info.new(
:id=>"job_id_2",
:job_owner=>job_owner,
:submit_host=>"server",
:status=>:queued,
:procs=>0,
:native=>job_hash_2
)
])
end

context "and when OodCore::Job::Adapters::PBSPro::Batch::Error is raised" do
let(:msg) { "random error" }
before do
allow(pbspro).to receive(:get_jobs).with(id: "job_id_1").and_raise(OodCore::Job::Adapters::PBSPro::Batch::Error, msg)
end

it "raises OodCore::JobAdapterError" do
expect { subject }.to raise_error(OodCore::JobAdapterError)
end

context "due to invalid job id" do
let(:msg) { "qstat: Unknown Job Id job_id\n" }

it "returns list of OodCore::Job::Info objects with a completed job" do
is_expected.to eq([
OodCore::Job::Info.new(
:id=>"job_id_1",
:status=>:completed
),
OodCore::Job::Info.new(
:id=>"job_id_2",
:job_owner=>job_owner,
:submit_host=>"server",
:status=>:queued,
:procs=>0,
:native=>job_hash_2
)
])
end
end
end
end
end

describe "#info" do
context "when id is not defined" do
it "raises ArgumentError" do
Expand Down

0 comments on commit 4f3bce5

Please sign in to comment.