-
Notifications
You must be signed in to change notification settings - Fork 23
Unit Resubmission on Failure and Cancellation
is a frequently requested feature, and several options exist to implement it, on different layers. We first what 'resubmission' means.
A unit has the following state life cycle (removing the pending states for brevity):
NEW - UMGR_SCHEDULING - UMGR_STAGING_INPUT - AGENT_STAGING_INPUT - AGENT_SCHEDULING - AGENT_EXECUTING - AGENT_STAGING_OUTPUT - UMGR_STAGING_OUTPUT - DONE
If the unit leaves that state progression early for a FAILED
or CANCELED
final state, a resubmission could mean one of two things:
- instead of going to
FAILED|CANCELED
, the unit goes toNEW
, or some other previous state - the units goes to
FAILED|CANCELED
, and a new, identical unit is submitted toNEW
state
Semantically that is almost equivalent: once the resubmitted unit is done, the same work has been achieved. But the details are different: in the first case, the state progression is not linear anymore, but can have circular sections. In the second case, a new unit with a new unit ID appears out of nowhere.
Since many of our analysis tools nowadays rely on a linear state model, and since linear state progressions are also much easier to handle on the implementation side, I currently favor the second approach or submitting identical clones as needed.
There is one fundamental semantic requirement: the application must be able the condition that a unit started, aborted, and started again.
Consider the following unit:
cud = rp.ComputeUnitDescription()
cud.executable = '/bin/sh'
cud.arguments = ['-c', 'test -f /data/out.dat || compute -o /data/out.dat']
that unit will only start compute
the first time it runs on any give resource, and will the second time see the output file existing, and exits, no matter if output data are complete or not.
A unit which can be restarted without such problems can be identified by the restartable
flag:
cud.restartable = True
That flag defaults to False
.
Now that we have the ability to identify what units can be resubmitted, we need to define when they are resubmitted, and how that mechanism is then implemented. The 'when' is not as straight forward to define as one would naively think:
cud.executable = '/bin/false'
would always return an non-zero exit code, would always go into FAILED
state. In other words, a unit can fail for legitimate reasons, and resubmitting it (a) won't solve the issue, and (b) would never finish the application.
On the other hand, there are clear conditions where resubmission makes immediate sense: a pilot fails for internal reasons while executing the unit; a pilot runs out of runtime; a staging operation failed for some intermittent network failure; etc.
But either way, alas, RP's unit inspection is not fine grained enough to really distinguish those cases. At the moment, the only information a resubmission decision is based on are the unit state and application knowledge.
The decision for unit resubmission can be made on the application layer, or by any RP component.
The application can subscribe to unit state changes, and gets the respective notifications delivered via callbacks. Such a callback can perform unit resubmission as needed:
def unit_state_cb(unit, state, cb_data):
if state == rp.FAILED:
with cb_data['lock']:
print 'resubmit %s' % unit.uid
umgr.submit_unit (unit.description)
lock = threading.Lock()
umgr.register_callback(unit_state_cb, cb_data={'lock': lock, 'umgr': umgr})
(Note the use of a threading.Lock
instance to make sure that the umgr is only used in one thread).
RP can internally use a very similar mechanism for unit resubmission - but can inject the new unit more directly into the workload it manages.