diff --git a/lib/ood_core/job/adapters/pbspro.rb b/lib/ood_core/job/adapters/pbspro.rb index 8e877944c..d27697203 100644 --- a/lib/ood_core/job/adapters/pbspro.rb +++ b/lib/ood_core/job/adapters/pbspro.rb @@ -10,12 +10,15 @@ class Factory # @param config [#to_h] the configuration for job adapter # @option config [Object] :host (nil) The batch server host # @option config [Object] :exec (nil) Path to PBS Pro executables + # @option config [Object] :qstat_factor (nil) Deciding factor on how to + # call qstat for a user def self.build_pbspro(config) c = config.to_h.compact.symbolize_keys host = c.fetch(:host, nil) exec = c.fetch(:exec, nil) + qstat_factor = c.fetch(:qstat_factor, nil) pbspro = Adapters::PBSPro::Batch.new(host: host, exec: exec) - Adapters::PBSPro.new(pbspro: pbspro) + Adapters::PBSPro.new(pbspro: pbspro, qstat_factor: qstat_factor) end end @@ -23,6 +26,7 @@ module Adapters # An adapter object that describes the communication with a PBS Pro # resource manager for job management. class PBSPro < Adapter + using Refinements::ArrayExtensions using Refinements::HashExtensions # Object used for simplified communication with a PBS Pro batch server @@ -90,6 +94,15 @@ def get_jobs(id: "") jobs.reject { |j| /\[\]/ =~ j[:job_id] } # drop main job array jobs end + # Select batch jobs from the batch server + # @param args [Array<#to_s>] arguments passed to `qselect` command + # @raise [Error] if `qselect` command exited unsuccessfully + # @return [Array] list of job ids that match selection + # criteria + def select_jobs(args: []) + call("qselect", *args).split("\n").map(&:strip) + end + # Put a specified job on hold # @example Put job "1234" on hold # my_batch.hold_job("1234") @@ -161,14 +174,22 @@ def call(cmd, *args, env: {}, stdin: "", chdir: nil) # ignore B as it signifies a job array } + # What percentage of jobs a user owns out of all jobs, used to decide + # whether we filter the owner's jobs from a `qstat` of all jobs or call + # `qstat` on each of the owner's individual jobs + # @return [Float] ratio of owner's jobs to all jobs + attr_reader :qstat_factor + # @api private # @param opts [#to_h] the options defining this adapter # @option opts [Batch] :pbspro The PBS Pro batch object + # @option opts [#to_f] :qstat_factor (0.10) The qstat deciding factor # @see Factory.build_pbspro def initialize(opts = {}) o = opts.to_h.compact.symbolize_keys @pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" } + @qstat_factor = o.fetch(:qstat_factor, 0.10).to_f end # Submit a job with the attributes defined in the job template instance @@ -254,6 +275,26 @@ def info_all raise JobAdapterError, e.message end + # Retrieve info for all jobs for a given owner or owners from the + # resource manager + # @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs + # @raise [JobAdapterError] if something goes wrong getting job info + # @return [Array] information describing submitted jobs + def info_where_owner(owner) + owner = Array.wrap(owner).map(&:to_s) + + usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")]) + all_jobs = @pbspro.select_jobs(args: ["-T"]) + + # `qstat` all jobs if user has too many jobs, otherwise `qstat` each + # individual job (default factor is 10%) + if usr_jobs.size > (qstat_factor * all_jobs.size) + super + else + usr_jobs.map { |id| info(id) } + end + end + # Retrieve job info from the resource manager # @param id [#to_s] the id of the job # @raise [JobAdapterError] if something goes wrong getting job info diff --git a/spec/job/adapters/pbspro_spec.rb b/spec/job/adapters/pbspro_spec.rb index 8c2b97772..664f922b4 100644 --- a/spec/job/adapters/pbspro_spec.rb +++ b/spec/job/adapters/pbspro_spec.rb @@ -4,12 +4,14 @@ describe OodCore::Job::Adapters::PBSPro do # Required arguments let(:pbspro) { double() } + let(:qstat_factor) { nil } # Subject - subject(:adapter) { described_class.new(pbspro: pbspro) } + subject(:adapter) { described_class.new(pbspro: pbspro, qstat_factor: qstat_factor) } it { is_expected.to respond_to(:submit).with(1).argument.and_keywords(:after, :afterok, :afternotok, :afterany) } it { is_expected.to respond_to(:info_all).with(0).arguments } + it { is_expected.to respond_to(:info_where_owner).with(1).argument } it { is_expected.to respond_to(:info).with(1).argument } it { is_expected.to respond_to(:status).with(1).argument } it { is_expected.to respond_to(:hold).with(1).argument } @@ -244,6 +246,105 @@ def build_script(opts = {}) end end + describe "#info_where_owner" do + let(:job_owner) { "job_owner" } + let(:pbspro) { double(select_jobs: job_ids) } + subject { adapter.info_where_owner(job_owner) } + + context "owner has no jobs" do + let(:job_ids) { [] } + + it { is_expected.to eq([]) } + end + + context "when given list of owners" do + let(:job_ids) { [] } + let(:job_owner) { ["job_owner_1", "job_owner_2"] } + + it "uses comma delimited owner list" do + expect(pbspro).to receive(:select_jobs).with(args: ["-u", job_owner.join(",")]) + is_expected.to eq([]) + end + end + + context "when owner has multiple jobs" do + let(:qstat_factor) { 1.00 } + let(:job_ids) { [ "job_id_1", "job_id_2" ] } + let(:job_hash_1) { + { + :job_id=>"job_id_1", + :Job_Owner=>"#{job_owner}@server", + :job_state=>"Q", + } + } + let(:job_hash_2) { + { + :job_id=>"job_id_2", + :Job_Owner=>"#{job_owner}@server", + :job_state=>"Q", + } + } + + before do + allow(pbspro).to receive(:get_jobs).with(id: "job_id_1").and_return([job_hash_1]) + allow(pbspro).to receive(:get_jobs).with(id: "job_id_2").and_return([job_hash_2]) + end + + it "returns list of OodCore::Job::Info objects" do + is_expected.to eq([ + OodCore::Job::Info.new( + :id=>"job_id_1", + :job_owner=>job_owner, + :submit_host=>"server", + :status=>:queued, + :procs=>0, + :native=>job_hash_1 + ), + OodCore::Job::Info.new( + :id=>"job_id_2", + :job_owner=>job_owner, + :submit_host=>"server", + :status=>:queued, + :procs=>0, + :native=>job_hash_2 + ) + ]) + end + + context "and when OodCore::Job::Adapters::PBSPro::Batch::Error is raised" do + let(:msg) { "random error" } + before do + allow(pbspro).to receive(:get_jobs).with(id: "job_id_1").and_raise(OodCore::Job::Adapters::PBSPro::Batch::Error, msg) + end + + it "raises OodCore::JobAdapterError" do + expect { subject }.to raise_error(OodCore::JobAdapterError) + end + + context "due to invalid job id" do + let(:msg) { "qstat: Unknown Job Id job_id\n" } + + it "returns list of OodCore::Job::Info objects with a completed job" do + is_expected.to eq([ + OodCore::Job::Info.new( + :id=>"job_id_1", + :status=>:completed + ), + OodCore::Job::Info.new( + :id=>"job_id_2", + :job_owner=>job_owner, + :submit_host=>"server", + :status=>:queued, + :procs=>0, + :native=>job_hash_2 + ) + ]) + end + end + end + end + end + describe "#info" do context "when id is not defined" do it "raises ArgumentError" do