Skip to content

State Management in RADICAL Pilot

Andre Merzky edited this page Mar 12, 2015 · 2 revisions

This page contains notes related to the architecture of RADICAL-Pilot. Specifically, they include the motivation for the recent (as of Q4/2014) re-architecturing of the pilot agent.

Stateful Entities and State Model Progression

RP, as many other distributed system, can be considered as a system of stateful entities, and a set of components which advance those entities through their state model. The stateful entities of RP are at this point:

  • ComputePilots
  • ComputeUnits

The RP components are, in a first level of abstraction,

  • PilotManagers (which manages the state progression of pilots)
  • UnitManagers (which manages the state progression of units)

Implementation-wise, the Manager instances are split over a number of threads and processes (which implement the individual components or parts thereof). Specifically, the responsibility for the ComputeUnit state progression is initiated by different threads in the application side RP module (NEW, SCHEDULED, PENDING_INPUT_STAGING, STAGING_INPUT, PENDING_EXECUTING), is then passed to the (also multi-threaded) Pilot Agent on the target resource (STAGING_INPUT, ALLOCATING, EXECUTING, PENDING_OUTPUT_STAGING, DONE, FAILED, CANCELED), and (for non-final units) is then passed back to the RP application module (STAGING_OUTPUT, DONE, FAILED, CANCELED).

The RADICAL-Pilot state models are:

Compared to the ComputeUnit state progression, the responsibility for the ComputePilot state progression remains with a single component, the PilotManager, although that is again implemented in multiple threads which share responsibility for individual state transitions (more later).

One important point has to be noted on the ComputeUnit state progressions:

  • some states (the staging related states and the final states) are managed by different components, which need to coordinate the respective progressions.
    • the file staging state progression is done by separating out sub-states, i.e. by identifying partial file staging states which are managed by the application module or the agent, not both.
    • the final states are managed by the agent IFF the unit requires no output file staging, and are managed by the application module otherwise.

The general rule is that each implementation component has responsibility of a very specific (set of) state transition(s) -- that component should never perform any other state transitions, but also it can rely that no other component will ever interfere with its own implementation of that state transition.

When side effect of that rule is that the component which currently manages an entity (ie. a ComputePilot or ComputeUnit) is the authority which defines that entity's state. The information recorded in the database, or contained in caches elsewhere, is a reflection of that authoritative decision, but is not authoritative itself.

Managing ComputeUnit States in the Pilot Agent

From an implementation perspective, the system's state models define (or at least strongly suggest) the implementation granularity: no functional component should be smaller than the management of an individual state. The reason is that if smaller components exist, they immediately imply shared responsibility for a state transition. As state transitions are defined to be atomic and ordered, this in turn implies the need for synchronization and coordination of those smaller components, and thus requires (relatively) tight coupling, thus negating the usual objectives of finer implementation granularity. (This is what we exactly see emerging by the shared responsibility for the staging states, which led to the introduction of sub-states which need to be communicated between the staging workers for coordination of the top level state transition.)

The pilot agent is currently designed around that principle: it is split into components which manage exactly one state (and associated state transitions):

  • the Agent class ingests ComputeUnits into the agent (by pulling ComputeUnits of appropriate state from MongoDB)
  • the StageinWorker class performs agent level staging for those ComputeUnits which require it (it shares the management of the INPUT_STAGING stage)
  • the Scheduler class allocates a suitable set of compute cores for the ComputeUnits (it manages the ALLOCATING state)
  • the ExecutionWorker class enrolls the ComputeUnits processes on the resource, and watches for process completion (it manages the EXECUTING state)
  • the StageoutWorker again performs agent level staging for those ComputeUnits which require it (it shares the management of the OUTPUT_STAGING stage)

Additionally, there is an UpdateWorker class which communicates state progression information to MongoDB. That class does not manage any ComputeUnit state, but only exists to decouple database interaction from the actual state progression, for performance reasons.

The different PENDING_XYZ states are present when ComputeUnit ownership is in limbo, ie. when a ComputeUnit is being passed on from one component to the other (usually the ComputeUnit is stored in a queue somewhere at that point).

Architecting a Distributed Pilot Agent

How exactly the pilot components listed above are implemented is, very much, an implementation detail: for example, the ExecutionWorker class can be a normal class, a thread, a process on the same node, a process on a different node, a service, or whatever (as long as ComputeUnit ownership can be communicated back and forth); multiple instances of it can co-exist, (as long as it is clear which instance owns which ComputeUnit at any point in time). Thus the granularity of the state model indirectly defines the granularity of the distributed pilot architecture.

Given that schema, there is still a wide variety of possible distribution schemes: the complete agent can be replicated; the scheduler can be split up (to manage partitions of the resource allocation); the scheduler can feed into one or more, local or remote execution workers; the staging worker can live on different nodes altogether (or not), etc. The exact scheme chosen is mostly defined by the resource architecture and OS constraints: how many concurrent APRUN processes can exist; can processes be spawned on compute nodes; can compute nodes communicated to the internet (for file staging for example).

Implications for the RP Application Module

The RP application module also consists of different implementation components, i.e. worker and manager threads. At this point, however, there is no clear definition of state ownership, specifically for the pilot states. That is one of the reasons that pilot cancelation is not cleanly implemented: pilot state Active is managed by one thread, cancelation is managed by another thread. The other reason for unclean shutdown is that no direct communication path exists between these two threads (communication is performed indirectly via MongoDB, which is sometimes broken during shutdown).

The intent is to restructure the module in a similar fashion as done for the pilot, that specific states are managed by specific threads (or components), and that DB interaction is decoupled from the state progression.

The planned implementation will be:

  • the API accepts state enaction requests, via the manager components (pilot_create, pilot_submit, unit_wait etc.)
  • for the pilot, the components / state transitions are:
    • the pilot manager is moving a pilot into the NEW state
    • submit_pilot pushes it to the PilotLauncher component, which moves it into the LAUNCHING state.
    • the PilotLauncher pushes the pilot, and control over it, to the resource's batch system, which moves it into ACTIVE state.
    • all state transitions are communicated asynchroneously to MongoDB
  • for the units, the components / state transitions are:
    • the unit manager is moving the units into NEW state, and pushes the to the unit manager scheduler component.
    • the UMGR_Scheduler moves the units into SCHEDULING state -- the units are on hold until a suitable target pilot is found. Once that happens, the units are pushed towards the UMGR_StagingInput component, and moved into UMGR_STAGING_INPUT state.
    • Once completed, the unit is moved into AGENT_STAGING_INPUT state, and control over the unit state progression is passed on to the pilot agent.
    • the pilot agent enacts the following state progression: AGENT_STAGING_INPUT -> AGENT_SCHEDULING -> EXECUTING -> AGENT_STAGING_OUTPUT
    • at that point, the unit control is passed back to the client side, to the UMGR_StagingOutput component, and the unit enters the UMGR_STAGING_OUTPUT stage.
    • once that stage is passed through, the unit is given back to the unit manager which moves it into a final state (DONE).
    • again, all state transitions are communicated asynchroneously to MongoDB.

Ordering of States

The discussion above motivates that multiple concurrent instances for each RADICAL-Pilot component can co-exist, while still guaranteeing that the ComputeUnit state transitions are always valid transitions, according to the state model.

There is one caveat to that observation: if multiple UpdateWorker instances exist, then the information about the state transition can be communicated out of order. To repeat: the state transitions are in order, the information about the transitions can be out of order.

As an example, assume the following simplified chain of state transitions: STAGING_INPUT, EXECUTING, STAGING_OUTPUT, DONE. Each state transition is put onto the Updater-Queue, and UpdateWorker instances compete for those events to push them out to the application. UpdateWorker 1 could win for STAGING_INPUT, EXECUTING, DONE, UpdateWorker 2 for STAGING_OUTPUT. Due to caching and bulk collection, UpdateWorker 2 could push out that event last, so that the information arrive on application side in this order: STAGING_INPUT, EXECUTING, DONE, STAGING_OUTPUT.

That is not a problem in itself: remember that all state transitions are in order, and that is what matters. Depending on the use case, different mechanisms can be used to mitigate the confusion for the application though (which will receive callbacks for those events):

  • missing state transitions can be inferred, and be reported (STAGING_OUTPUT always comes before DONE);
  • reporting DONE can be delayed until information about missing state transitions (STAGING_OUTPUT) have also arrived;
  • event callbacks can be defined to be potentially out-of-order.

We leave it open at this point which option is needed (if any). We opt to not enforce ordering of state transition information, as that implies significant code complexity which we fill it not warranted, given the options for mitigation.

Clone this wiki locally