Skip to content

Commit

Permalink
Update Launcher for using local data volumes.
Browse files Browse the repository at this point in the history
Updating Launcher to prepare services with local volume mounts when some
data requirements must be fulfilled by local data on the physical node,
and to update the relevant other args for starting worker services so
that one worker on each node makes sure data gets prepared in local
volumes as needed as part of job startup.
  • Loading branch information
robertbartel committed Aug 6, 2024
1 parent 2e92b13 commit e7b2391
Showing 1 changed file with 46 additions and 9 deletions.
55 changes: 46 additions & 9 deletions python/lib/scheduler/dmod/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def _ds_names_helper(cls, job: 'Job', worker_index: int, category: DataCategory,
return list(dataset_names)

# TODO (later): once we get to dynamic/custom images (i.e., for arbitrary BMI modules), make sure this still works
def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
def _generate_docker_cmd_args(self, job: 'Job', worker_index: int, primary_workers: Dict[str, int]) -> List[str]:
"""
Create the Docker "CMD" arguments list to be used to start all services that will perform this job.
Expand All @@ -379,6 +379,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
The job to have worker Docker services started, with those services needing "CMD" arguments generated.
worker_index : int
The particular worker service index in question, which will have a specific set of data requirements.
primary_workers : Dict[str, int]
Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job.
Returns
-------
Expand Down Expand Up @@ -407,7 +409,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
"--job-id": str(job.job_id), "--worker-index": str(worker_index)}

if isinstance(job.model_request, AbstractNgenRequest):
docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job, worker_index))
docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job=job, worker_index=worker_index,
primary_workers=primary_workers))

# Finally, convert the args map to a list, with each "flag"/key immediately preceding its value
args_as_list = []
Expand All @@ -417,7 +420,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:

return args_as_list

def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> Dict[str, str]:
def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int,
primary_workers: Dict[str, int]) -> Dict[str, str]:
"""
Prepare the specific Docker CMD arg applicable to Nextgen-based jobs
Expand All @@ -432,6 +436,8 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -
The job to have worker Docker services started, with those services needing "CMD" arguments generated.
worker_index : int
The particular worker service index in question, which will have a specific set of data requirements.
primary_workers : Dict[str, int]
Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job.
Returns
-------
Expand All @@ -452,6 +458,7 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -
"--hydrofabric-dataset": self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1)[0],
"--config-dataset": self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1,
data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG)[0],
"--primary-workers": ",".join(str(v) for _, v in primary_workers.items())
}

if job.cpu_count > 1:
Expand Down Expand Up @@ -710,14 +717,43 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]:
secrets = [self.get_secret_reference(secret_name) for secret_name in
['object_store_exec_user_name', 'object_store_exec_user_passwd']]

# Decide (somewhat trivially) which worker per-host will be considered "primary"
# In particular, this node will handle processing any data that needs to be local on the host
# Also, always make worker 0 primary for its host
per_node_primary_workers = {job.allocations[0].hostname: 0}
for alloc_index in range(1, num_allocations):
if (hostname := job.allocations[alloc_index].hostname) not in per_node_primary_workers:
per_node_primary_workers[hostname] = alloc_index

for alloc_index in range(num_allocations):
alloc = job.allocations[alloc_index]
constraints_str = f"node.hostname == {alloc.hostname}"
constraints = list(constraints_str.split("/"))

pattern = '{}:/dmod/datasets/{}/{}:rw'
mounts = [pattern.format(r.fulfilled_access_at, r.category.name.lower(), r.fulfilled_by) for r in
job.worker_data_requirements[alloc_index] if r.fulfilled_access_at is not None]
mounts = []
for req in job.worker_data_requirements[alloc_index]:
# TODO: (later) not sure, but this seems like it may be problematic condition that requires exception
if req.fulfilled_access_at is None:
logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' "
f"has no value for `fulfilled_access_at`; skipping this {req.__class__.__name__} "
f"when assembling Docker service mounts.")
continue
cluster_volume = req.fulfilled_access_at
category_subdir = req.category.name.lower()
dataset_name = req.fulfilled_by
mounts.append(f"{cluster_volume}:/dmod/cluster_volumes/{category_subdir}/{dataset_name}")

# Allow this to not have been set (for now at least), but log a warning
if req.needs_data_local is None:
logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' "
f"does not indicate explicitly whether the required data needs to be available "
f"locally during job execution.")
# For requirements that need local data, mount a local Docker volume for them
elif req.needs_data_local:
# TODO: (later/future) make sure something has checked to see that space is available on the nodes
local_volume = f"{dataset_name}_local_vol"
mounts.append(f"{local_volume}:/dmod/local_volumes/{category_subdir}/{dataset_name}")

#mounts.append('/local/model_as_a_service/docker_host_volumes/forcing_local:/dmod/datasets/forcing_local:rw')
# Introduce a way to inject data access directly via env config, to potentially bypass things for testing
bind_mount_from_env = getenv('DMOD_JOB_WORKER_HOST_MOUNT')
Expand All @@ -744,9 +780,10 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]:
service_params.capabilities_to_add = ['SYS_ADMIN']

#TODO check for proper service creation, return False if doesn't work
service = self.create_service(serviceParams=service_params, idx=alloc_index,
docker_cmd_args=self._generate_docker_cmd_args(job, alloc_index))

cmd_args = self._generate_docker_cmd_args(job=job, worker_index=alloc_index,
primary_workers=per_node_primary_workers)
service = self.create_service(serviceParams=service_params, idx=alloc_index, docker_cmd_args=cmd_args)

service_per_allocation.append(service)

logging.info("\n")
Expand Down

0 comments on commit e7b2391

Please sign in to comment.