Skip to content

Commit

Permalink
[COMPUTE-993] nvidiaDriver field support in dx-toolkit (#1399)
Browse files Browse the repository at this point in the history
* add nvidia_driver to dxjob.py

* simplify if statement in dxjob.py

* add nvidia_driver to dxapplet.py

* add nvidia_driver to system_requirements.py

* remove duplicated commment

* add nvidia_driver to dx.py

* add nvidia_driver to dx.py

* uncomment tests

* remove print

* add nvidia check to test

* update readme

* edit test

* add logs

* edit job logs

* add test logs

* remove logs

* edit test

* remove logging

* fix1: edit if condition

* fix2: move nvidia_driver at the end

* fix3: add test to test_dx_bash_helpers.py

* fix4: add job clone TC

* add ''

* fix command

* fix command2

* add 1more TC

* fix4: move nvidia driver param

* fix5: use if any
  • Loading branch information
jsitarova-dnanexus authored Oct 11, 2024
1 parent fdb0855 commit 4648201
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 28 deletions.
8 changes: 8 additions & 0 deletions src/python/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ Example:
$ _DX_DEBUG=1 dx ls
```

### Debugging inside the IDE (PyCharm)
To be able to debug dx-toolkit (dx commands) directly in the IDE, 'Run/Debug Configurations' needs to be changed.
1. Go to Run → Edit Configurations...
2. Add New Configuration (Python)
3. Change script to module (dxpy.scripts.dx)
4. To Script parameters field write dx command you want to run (eg 'ls' runs 'dx ls')
5. Apply and OK (now it is possible to start debugging via main() function in dx.py)

Python coding style
-------------------

Expand Down
12 changes: 8 additions & 4 deletions src/python/dxpy/bindings/dxapplet.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ def _get_run_input_common_fields(executable_input, **kwargs):
if kwargs.get(arg) is not None:
run_input[arg] = kwargs[arg]

if kwargs.get('instance_type') is not None or kwargs.get('cluster_spec') is not None or kwargs.get('fpga_driver') is not None:
if any(kwargs.get(key) is not None for key in ['instance_type', 'cluster_spec', 'fpga_driver', 'nvidia_driver']):
instance_type_srd = SystemRequirementsDict.from_instance_type(kwargs.get('instance_type'))
cluster_spec_srd = SystemRequirementsDict(kwargs.get('cluster_spec'))
fpga_driver_srd = SystemRequirementsDict(kwargs.get('fpga_driver'))
run_input["systemRequirements"] = (instance_type_srd + cluster_spec_srd + fpga_driver_srd).as_dict()
nvidia_driver_srd = SystemRequirementsDict(kwargs.get('nvidia_driver'))
run_input["systemRequirements"] = (instance_type_srd + cluster_spec_srd + fpga_driver_srd + nvidia_driver_srd).as_dict()

if kwargs.get('system_requirements') is not None:
run_input["systemRequirements"] = kwargs.get('system_requirements')
Expand Down Expand Up @@ -195,7 +196,7 @@ def run(self, executable_input, project=None, folder=None, name=None, tags=None,
depends_on=None, allow_ssh=None, debug=None, delay_workspace_destruction=None, priority=None, head_job_on_demand=None,
ignore_reuse=None, ignore_reuse_stages=None, detach=None, cost_limit=None, rank=None, max_tree_spot_wait_time=None,
max_job_spot_wait_time=None, preserve_job_outputs=None, detailed_job_metrics=None, extra_args=None,
fpga_driver=None, system_requirements=None, system_requirements_by_executable=None, **kwargs):
fpga_driver=None, system_requirements=None, system_requirements_by_executable=None, nvidia_driver=None, **kwargs):
'''
:param executable_input: Hash of the executable's input arguments
:type executable_input: dict
Expand Down Expand Up @@ -252,6 +253,8 @@ def run(self, executable_input, project=None, folder=None, name=None, tags=None,
:type system_requirements: dict
:param system_requirements_by_executable: System requirement by executable double mapping
:type system_requirements_by_executable: dict
:param nvidia_driver: a dict mapping function names to nvidia driver requests
:type nvidia_driver: dict
:rtype: :class:`~dxpy.bindings.dxjob.DXJob`
Creates a new job that executes the function "main" of this executable with
Expand Down Expand Up @@ -292,7 +295,8 @@ def run(self, executable_input, project=None, folder=None, name=None, tags=None,
extra_args=extra_args,
fpga_driver=fpga_driver,
system_requirements=system_requirements,
system_requirements_by_executable=system_requirements_by_executable)
system_requirements_by_executable=system_requirements_by_executable,
nvidia_driver=nvidia_driver)
return self._run_impl(run_input, **kwargs)


Expand Down
34 changes: 20 additions & 14 deletions src/python/dxpy/bindings/dxjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@
from ..utils.local_exec_utils import queue_entry_point
from ..compat import basestring


#########
# DXJob #
#########

def new_dxjob(fn_input, fn_name, name=None, tags=None, properties=None, details=None,
instance_type=None, depends_on=None,
cluster_spec=None, fpga_driver=None, system_requirements=None, system_requirements_by_executable=None,
**kwargs):

def new_dxjob(fn_input, fn_name, name=None, tags=None, properties=None, details=None, instance_type=None,
depends_on=None, cluster_spec=None, fpga_driver=None, system_requirements=None,
system_requirements_by_executable=None, nvidia_driver=None, **kwargs):
'''
:param fn_input: Function input
:type fn_input: dict
Expand All @@ -71,6 +72,8 @@ def new_dxjob(fn_input, fn_name, name=None, tags=None, properties=None, details=
:type system_requirements: dict
:param system_requirements_by_executable: System requirement by executable double mapping
:type system_requirements_by_executable: dict
:param nvidia_driver: a dict mapping function names to nvidia driver requests
:type nvidia_driver: dict
:rtype: :class:`~dxpy.bindings.dxjob.DXJob`
Creates and enqueues a new job that will execute a particular
Expand All @@ -94,12 +97,13 @@ def new_dxjob(fn_input, fn_name, name=None, tags=None, properties=None, details=
'''
dxjob = DXJob()
dxjob.new(fn_input, fn_name, name=name, tags=tags, properties=properties,
details=details, instance_type=instance_type, depends_on=depends_on,
cluster_spec=cluster_spec, fpga_driver=fpga_driver,
system_requirements=system_requirements, system_requirements_by_executable=system_requirements_by_executable, **kwargs)
dxjob.new(fn_input, fn_name, name=name, tags=tags, properties=properties, details=details,
instance_type=instance_type, depends_on=depends_on, cluster_spec=cluster_spec, fpga_driver=fpga_driver,
system_requirements=system_requirements, system_requirements_by_executable=system_requirements_by_executable,
nvidia_driver=nvidia_driver, **kwargs)
return dxjob


class DXJob(DXObject):
'''
Remote job object handler.
Expand All @@ -112,10 +116,9 @@ def __init__(self, dxid=None):
DXObject.__init__(self, dxid=dxid)
self.set_id(dxid)

def new(self, fn_input, fn_name, name=None, tags=None, properties=None, details=None,
instance_type=None, depends_on=None,
cluster_spec=None, fpga_driver=None, system_requirements=None, system_requirements_by_executable=None,
**kwargs):
def new(self, fn_input, fn_name, name=None, tags=None, properties=None, details=None, instance_type=None,
depends_on=None, cluster_spec=None, fpga_driver=None, system_requirements=None,
system_requirements_by_executable=None, nvidia_driver=None, **kwargs):
'''
:param fn_input: Function input
:type fn_input: dict
Expand All @@ -141,6 +144,8 @@ def new(self, fn_input, fn_name, name=None, tags=None, properties=None, details=
:type system_requirements: dict
:param system_requirements_by_executable: System requirement by executable double mapping
:type system_requirements_by_executable: dict
:param nvidia_driver: a dict mapping function names to nvidia driver requests
:type nvidia_driver: dict
Creates and enqueues a new job that will execute a particular
function (from the same app or applet as the one the current job
Expand Down Expand Up @@ -179,11 +184,12 @@ def new(self, fn_input, fn_name, name=None, tags=None, properties=None, details=
req_input["tags"] = tags
if properties is not None:
req_input["properties"] = properties
if instance_type is not None or cluster_spec is not None or fpga_driver is not None:
if any(requirement is not None for requirement in [instance_type, cluster_spec, fpga_driver, nvidia_driver]):
instance_type_srd = SystemRequirementsDict.from_instance_type(instance_type, fn_name)
cluster_spec_srd = SystemRequirementsDict(cluster_spec)
fpga_driver_srd = SystemRequirementsDict(fpga_driver)
req_input["systemRequirements"] = (instance_type_srd + cluster_spec_srd + fpga_driver_srd).as_dict()
nvidia_driver_srd = SystemRequirementsDict(nvidia_driver)
req_input["systemRequirements"] = (instance_type_srd + cluster_spec_srd + fpga_driver_srd + nvidia_driver_srd).as_dict()
if system_requirements is not None:
req_input["systemRequirements"] = system_requirements
if system_requirements_by_executable is not None:
Expand Down
14 changes: 10 additions & 4 deletions src/python/dxpy/scripts/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3195,10 +3195,12 @@ def run_body(args, executable, dest_proj, dest_path, preset_inputs=None, input_n
cloned_instance_type = SystemRequirementsDict.from_sys_requirements(cloned_system_requirements, _type='instanceType')
cloned_cluster_spec = SystemRequirementsDict.from_sys_requirements(cloned_system_requirements, _type='clusterSpec')
cloned_fpga_driver = SystemRequirementsDict.from_sys_requirements(cloned_system_requirements, _type='fpgaDriver')
cloned_nvidia_driver = SystemRequirementsDict.from_sys_requirements(cloned_system_requirements, _type='nvidiaDriver')
cloned_system_requirements_by_executable = args.cloned_job_desc.get("mergedSystemRequirementsByExecutable", {}) or {}
else:
cloned_system_requirements = {}
cloned_instance_type, cloned_cluster_spec, cloned_fpga_driver = SystemRequirementsDict({}), SystemRequirementsDict({}), SystemRequirementsDict({})
cloned_instance_type, cloned_cluster_spec, cloned_fpga_driver, cloned_nvidia_driver = (
SystemRequirementsDict({}), SystemRequirementsDict({}), SystemRequirementsDict({}), SystemRequirementsDict({}))
cloned_system_requirements_by_executable = {}

# convert runtime --instance-type into mapping {entrypoint:{'instanceType':xxx}}
Expand Down Expand Up @@ -3227,12 +3229,15 @@ def run_body(args, executable, dest_proj, dest_path, preset_inputs=None, input_n
else:
requested_cluster_spec = cloned_cluster_spec

# fpga driver now does not have corresponding dx run option, so it can only be requested using the cloned value
# fpga/nvidia driver now does not have corresponding dx run option,
# so it can only be requested using the cloned value
requested_fpga_driver = cloned_fpga_driver
requested_nvidia_driver = cloned_nvidia_driver

# combine the requested instance type, full cluster spec, fpga spec
# combine the requested instance type, full cluster spec, fpga spec, nvidia spec
# into the runtime systemRequirements
requested_system_requirements = (requested_instance_type + requested_cluster_spec + requested_fpga_driver).as_dict()
requested_system_requirements = (requested_instance_type + requested_cluster_spec + requested_fpga_driver +
requested_nvidia_driver).as_dict()

if (args.instance_type and cloned_system_requirements_by_executable):
warning = BOLD("WARNING") + ": --instance-type argument: {} may get overridden by".format(args.instance_type)
Expand Down Expand Up @@ -3283,6 +3288,7 @@ def run_body(args, executable, dest_proj, dest_path, preset_inputs=None, input_n
"instance_type": None,
"cluster_spec": None,
"fpga_driver": None,
"nvidia_driver": None,
"stage_instance_types": args.stage_instance_types,
"stage_folders": args.stage_folders,
"rerun_stages": args.rerun_stages,
Expand Down
2 changes: 1 addition & 1 deletion src/python/dxpy/system_requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def from_sys_requirements(cls, system_requirements, _type='all'):
It can extract only entrypoints with specific fields ('clusterSpec',
'instanceType', etc), depending on the value of _type.
"""
allowed_types = ['all', 'clusterSpec', 'instanceType', 'fpgaDriver']
allowed_types = ['all', 'clusterSpec', 'instanceType', 'fpgaDriver', 'nvidiaDriver']
if _type not in (allowed_types):
raise DXError("Expected '_type' to be one of the following: {}".format(allowed_types))

Expand Down
16 changes: 16 additions & 0 deletions src/python/test/test_dx_bash_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,22 @@ def test_job_arguments(self):
"other_function": {"fpgaDriver": "edico-1.4.5"}}}})),
{"systemRequirementsByExecutable": {"my_applet":{"main": { "instanceType": "mem2_hdd2_x2", "clusterSpec":{"initialInstanceCount": 3}},
"other_function": { "instanceType": "mem3_ssd2_fpga1_x8", "fpgaDriver": "edico-1.4.5"} }}}),
# nvidia driver
("--instance-type-by-executable " +
pipes.quote(json.dumps({
"my_applet": {
"main": "mem1_ssd1_v2_x2",
"other_function": "mem2_ssd1_gpu_x16"}})) +
" --extra-args " +
pipes.quote(json.dumps({
"systemRequirementsByExecutable": {
"my_applet": {
"main": {"instanceType": "mem2_hdd2_x2"},
"other_function": {"nvidiaDriver": "R535"}}}})),
{"systemRequirementsByExecutable": {
"my_applet": {"main": {"instanceType": "mem2_hdd2_x2"},
"other_function": {"instanceType": "mem2_ssd1_gpu_x16",
"nvidiaDriver": "R535"}}}}),
# properties - mapping
(
"--property foo=foo_value --property bar=bar_value",
Expand Down
56 changes: 53 additions & 3 deletions src/python/test/test_dxclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3332,6 +3332,53 @@ def test_dx_run_sys_reqs(self):
run("dx run " + applet_id +
" --instance-type-by-executable not-a-JSON-string")

def test_dx_run_clone_nvidia_driver(self):
"""
Run the applet and clone the origin job. Verify nvidiaDriver value.
"""
build_nvidia_version = "R535"
run_nvidia_version = "R470"

applet_id = dxpy.api.applet_new({"project": self.project,
"dxapi": "1.0.0",
"runSpec": {"interpreter": "bash",
"distribution": "Ubuntu",
"release": "20.04",
"version": "0",
"code": "echo 'hello'",
"systemRequirements": {
"*": {
"instanceType": "mem2_hdd2_x1",
"nvidiaDriver": build_nvidia_version
}
}}
})['id']

# Run with unchanged nvidia version (build value)
origin_job_id = run(f"dx run {applet_id} --brief -y").strip().split('\n')[-1]
origin_job_desc = dxpy.api.job_describe(origin_job_id)
assert origin_job_desc["systemRequirements"]["*"]["nvidiaDriver"] == build_nvidia_version

cloned_job_id = run(f"dx run --clone {origin_job_id} --brief -y").strip()
cloned_job_desc = dxpy.api.job_describe(cloned_job_id)
assert cloned_job_desc["systemRequirements"]["*"]["nvidiaDriver"] == build_nvidia_version

# Change nvidia driver version in runtime - origin job (run value)
extra_args = json.dumps({"systemRequirements": {"*": {"nvidiaDriver": run_nvidia_version}}})
origin_job_id_nvidia_override = run(f"dx run {applet_id} --extra-args '{extra_args}' --brief -y").strip().split('\n')[-1]
origin_job_desc = dxpy.api.job_describe(origin_job_id_nvidia_override)
assert origin_job_desc["systemRequirements"]["*"]["nvidiaDriver"] == run_nvidia_version

cloned_job_id_nvidia_override = run(f"dx run --clone {origin_job_id_nvidia_override} --brief -y").strip()
cloned_job_desc = dxpy.api.job_describe(cloned_job_id_nvidia_override)
assert cloned_job_desc["systemRequirements"]["*"]["nvidiaDriver"] == run_nvidia_version

# Change nvidia driver version in runtime - cloned job (build value)
extra_args = json.dumps({"systemRequirements": {"*": {"nvidiaDriver": build_nvidia_version}}})
cloned_job_id_nvidia_override = run(f"dx run --clone {origin_job_id_nvidia_override} --extra-args '{extra_args}' --brief -y").strip()
cloned_job_desc = dxpy.api.job_describe(cloned_job_id_nvidia_override)
assert cloned_job_desc["systemRequirements"]["*"]["nvidiaDriver"] == build_nvidia_version

def test_dx_run_clone(self):
applet_id = dxpy.api.applet_new({"project": self.project,
"dxapi": "1.0.0",
Expand Down Expand Up @@ -3626,12 +3673,14 @@ def check_instance_count(job_desc , entrypoints , expected_counts):
check_new_job_metadata(new_job_desc, orig_job_desc,
overridden_fields=['systemRequirements'])

# fpgaDriver override: new original job with extra_args
# fpgaDriver/nvidiaDriver override: new original job with extra_args
orig_job_id = run("dx run " + other_applet_id +
" --instance-count 2 --brief -y " +
"--extra-args '" +
json.dumps({"systemRequirements": {"some_ep": {"clusterSpec": {"initialInstanceCount": 12, "bootstrapScript": "z.sh"},
"fpgaDriver": "edico-1.4.5"}}}) + "'").strip()
json.dumps({"systemRequirements": {"some_ep":
{"clusterSpec": {"initialInstanceCount": 12, "bootstrapScript": "z.sh"},
"fpgaDriver": "edico-1.4.5",
"nvidiaDriver": "R535"}}}) + "'").strip()
orig_job_desc = dxpy.api.job_describe(orig_job_id)
check_instance_count(orig_job_desc, ["main", "some_ep","*"], [2, 12, 2])
# --instance-type and --instance-count override: instance type and cluster spec are resolved independently
Expand All @@ -3650,6 +3699,7 @@ def check_instance_count(job_desc , entrypoints , expected_counts):
self.assertEqual(new_job_desc['systemRequirements']['*']['instanceType'], 'mem2_hdd2_v2_x2')

self.assertEqual(new_job_desc['systemRequirements']['some_ep']['fpgaDriver'], 'edico-1.4.5')
self.assertEqual(new_job_desc['systemRequirements']['some_ep']['nvidiaDriver'], 'R535')
self.assertEqual(new_job_desc['systemRequirements']['some_ep']['clusterSpec']['bootstrapScript'], 'z.sh')

# --instance-type and --instance-type-by-executable override
Expand Down
Loading

0 comments on commit 4648201

Please sign in to comment.