Skip to content

Latest commit

 

History

History
220 lines (167 loc) · 5.23 KB

README.md

File metadata and controls

220 lines (167 loc) · 5.23 KB

sgepy Upload Python Package PyPI version

sgepy

Simple package for simple SGE job submission & monitoring

Table of Contents

Install

Dependencies

Via conda

conda install "dill>=0.3" "pathos>=0.2.8" "tqdm>=4"

sgepy package

From Github

pip install git+https://github.com/nick-youngblut/sgepy.git

Tests

conda install pytest
pytest -s

Usage

Worker class

sgepy.Worker(kwargs=dict(), pkgs=[], threads=1, time='00:59:00', mem=6, gpu=0,
             max_attempts=3, conda_env='snakemake', 
             tmp_dir='/ebio/abt3_projects/temp_data/', keep_tmp=False,
	     parallel_env='parallel', verbose=False)

Parameters

  • kwargs
    • Keyword arguments passed to the function
  • pkgs
    • Python packages used within any functions passed to the worker object
    • e.g., pkgs=['time'] if time.sleep() is used in the function
  • threads
    • Number of parallel processes for the SGE job
  • time
    • Time resource parameter. Format = HH:MM:SS
    • Resource escalation possible (see below)
  • mem
    • Memory resource (Gb)
      • Note: the memory is multiplied by the number of threads
    • Resource escalation possible (see below)
  • gpu
    • Use a gpu? 0 = No; 1 = Yes
  • max_attempts
    • Number of times to re-submit the job
      • This only makes sense if resource escalation is used (see below)
  • conda_env
    • Which conda env to use for the cluster job?
    • Use base for the "standard" conda env
  • tmp_dir
    • Temporary file path
    • Note: by default, the directory will be deleted after the job is complete
  • keep_tmp
    • Keep the temporary file path?
    • This is useful for debugging
  • verbose
    • Verbose output?

Resource escalation

For time and mem, resource escalation can be used, similar to Snakemake.

To use resource escalation, provide a function for the time or mem parameters. For example:

time = lambda attempt, threads: 8 * attempt ** 3
# time increases by 8 * $ATTEMPT ** 3 for each job submission (attempt)

The function must have two arguments: attempt and threads

  • attempt
    • The job attempt count (see max_attempts)
  • threads
    • The number of threads used for the job
    • This can be useful for mem so that the resource can be set based on the number of threads used

Returns

Worker object that will run a function as a job on the cluster. The object will monitor the cluster job and can re-run the job if resourse escalation is used (see above).

Object usage

worker_object(function, x)
Parameters
  • function
    • function to run as an SGE job
  • x
    • varible passed to the function as the first parameter

Pool class

sgepy.Pool(njobs=1, kwargs=dict(), pkgs=[], threads=1, time='00:59:00', mem=6, gpu=0,
           max_attempts=3, conda_env='snakemake', 
           tmp_dir='/ebio/abt3_projects/temp_data/', keep_tmp=False,
	   parallel_env='parallel', verbose=False)

Parameters

  • njobs
    • Number of jobs to submit in parallel
    • i.e., number of parallel workers
  • Other parameters
    • See the Worker class (above)

Returns

pool object, similar to that generated by the multiprocessing.Pool() class

Object usage

Usage is similar to the multiprocessing.Pool() class.

pool.map(function, x)
Parameters
  • function
    • Function run in each cluster job
  • x
    • Iterable; each value will be processed by an independent cluster job
    • Each value will be processed by the user-provided function

Examples

Using a simple lambda function

import sgepy
func = lambda x: [x**2 for x in range(5)]
w = sgepy.Workerverbose=True)
w(func, 2)

Test with keyword arguments and package dependencies

import sgepy

# simple function
def func1(x, y=1, z=2):
    time.sleep(x)
    return x * y * z

# cluster worker 
kwargs = {'y' : 2, 'z' : 3}
pkgs = ['time']
w = sgepy.Worker(tmp_dir=args.tmp_dir, kwargs=kwargs, pkgs=pkgs, verbose=True)
w(func1, 1)

Using the multiprocessing.Pool() functionality

import sgepy

# simple function (requires import of a package)
def func1(x, y=1, z=2):
    time.sleep(x)
    return x * y * z
    
# map call
kwargs = {'y' : 2, 'z' : 2}
pkgs = ['time']
p = sgepy.Pool(tmp_dir=args.tmp_dir, kwargs=kwargs, pkgs=pkgs, n_jobs=2, verbose=True)
p.map(func1, [1,5])