Skip to content

Commit

Permalink
Make deliberately scaled-in unstarted blocks not be failures (#3594)
Browse files Browse the repository at this point in the history
This PR adds a new terminal job state, SCALED_IN. None of the existing providers will return it, but the scaling layer will use it to mark a job as deliberately scaled in, so that error handling code will not regard it as failed.

Fixes #3568
  • Loading branch information
benclifford authored Aug 26, 2024
1 parent 789ee82 commit 4ea3fbc
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 10 deletions.
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]:
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
if job_info.terminal and job_id not in connected_blocks:
if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN:
logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info)
job_status[job_id].state = JobState.MISSING
if job_status[job_id].message is None:
job_status[job_id].message = (
Expand Down
7 changes: 5 additions & 2 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
if block_ids is not None:
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.CANCELLED)
del self._status[block_id]
logger.debug("Marking block %s as SCALED_IN", block_id)
s = JobStatus(JobState.SCALED_IN)
new_status[block_id] = s
self._status[block_id] = s
self._simulated_status[block_id] = s
self.send_monitoring_info(new_status)
return block_ids
7 changes: 6 additions & 1 deletion parsl/jobs/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class JobState(IntEnum):
bad worker environment or network connectivity issues.
"""

SCALED_IN = 9
"""This job has been deliberately scaled in. Scaling code should not be concerned
that the job never ran (for example for error handling purposes).
"""

def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"


TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT, JobState.MISSING]
JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN]


class JobStatus:
Expand Down
8 changes: 3 additions & 5 deletions parsl/tests/test_htex/test_multiple_disconnected_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ def local_config():
poll_period=100,
max_workers_per_node=1,
provider=LocalProvider(
worker_init="conda deactivate; export PATH=''; which python; exit 0",
init_blocks=2,
max_blocks=4,
min_blocks=0,
worker_init="exit 0",
init_blocks=2
),
)
],
run_dir="/tmp/test_htex",
max_idletime=0.5,
strategy='htex_auto_scale',
strategy='none',
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy):
(c, ) = result.first()
assert c == 1, "There should be a single pending status"

result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds)
result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds)
(c, ) = result.first()
assert c == 1, "There should be a single cancelled status"
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time

import pytest

import parsl
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import WrappedLauncher
from parsl.providers import LocalProvider


def local_config():
# see the comments inside test_regression for reasoning about why each
# of these parameters is set why it is.
return Config(
max_idletime=1,

strategy='htex_auto_scale',
strategy_period=1,

executors=[
HighThroughputExecutor(
label="htex_local",
encrypted=True,
provider=LocalProvider(
init_blocks=1,
min_blocks=0,
max_blocks=1,
launcher=WrappedLauncher(prepend="sleep inf ; "),
),
)
],
)


@parsl.python_app
def task():
return 7


@pytest.mark.local
def test_regression(try_assert):
# The above config means that we should start scaling out one initial
# block, but then scale it back in after a second or so if the executor
# is kept idle (which this test does using try_assert).

# Because of 'sleep inf' in the WrappedLaucher, the block will not ever
# register.

# The bug being tested is about mistreatment of blocks which are scaled in
# before they have a chance to register, and the above forces that to
# happen.

# After that scaling in has happened, we should see that we have one block
# and it should be in a terminal state. The below try_assert waits for
# that to become true.

# At that time, we should also see htex reporting no blocks registered - as
# mentioned above, that is a necessary part of the bug being tested here.

# Give 10 strategy periods for the above to happen: each step of scale up,
# and scale down due to idleness isn't guaranteed to happen in exactly one
# scaling step.

htex = parsl.dfk().executors['htex_local']

try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal,
timeout_ms=10000)

assert htex.connected_blocks() == [], "No block should have connected to interchange"

# Now we can reconfigure the launcher to let subsequent blocks launch ok,
# and run a trivial task. That trivial task will scale up a new block and
# run the task successfully.

# Prior to issue #3568, the bug was that the scale in of the first
# block earlier in the test case would have incorrectly been treated as a
# failure, and then the block error handler would have treated that failure
# as a permanent htex failure, and so the task execution below would raise
# a BadStateException rather than attempt to run the task.

assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty"
htex.provider.launcher.prepend = ""
assert task().result() == 7

0 comments on commit 4ea3fbc

Please sign in to comment.