Skip to content

Commit

Permalink
Executor 2.0: Per-operator stream assignment policy (#5620)
Browse files Browse the repository at this point in the history
Assignment algorithm:
- Each node holds a set of stream ids that are considered ready after the node is done. Each "ready" stream id is associated with a use count. When the stream id is used, it's use count is bumped up by 1. The nodes' ready sets contain the use count at the time at which they were inserted into the ready set.
- The stream for the current node (nodes are processed in topological order) is obtained by looking at the "ready" sets of the preceding nodes and taking the lowest id which have use count equal to one found int the ready set. If the ready sets were empty (or all streams have had their use count bumped up since insertion into the ready set), a new stream id is generated.
- When the stream id is assigned to a node, it's use count is bumped up by 1 and it's inserted into the current node's ready set.
- The ready set for the current node is a union of input nodes' ready set's + current stream id.

----

Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
  • Loading branch information
mzient authored Sep 8, 2024
1 parent 4b19d43 commit 2117f88
Show file tree
Hide file tree
Showing 2 changed files with 688 additions and 3 deletions.
187 changes: 184 additions & 3 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <algorithm>
#include <cassert>
#include <functional>
#include <map>
#include <optional>
#include <queue>
#include <unordered_map>
#include <set>
#include <utility>
#include <vector>
#include "dali/pipeline/graph/graph_util.h"
Expand All @@ -38,8 +38,6 @@ enum class StreamPolicy : int {
Single, //< There's just one stream that's used by all operators
PerBackend, //< Operators are scheduled on a stream specific to their backend (mixed or GPU)
PerOperator //< Independent operators are executed on separate streams.

// TODO(michalz): implement minimal assignment for PerOperator policy
};


Expand Down Expand Up @@ -164,6 +162,189 @@ class StreamAssignment<StreamPolicy::PerBackend> {
bool has_mixed_ = false;
};

/** Implements per-operator stream assignment.
*
* This policy implements stream assingment such that independent GPU/Mixed operators get
* separate streams. When there's a dependency then one dependent operator shares the stream of
* its predecessor.
*
* Example - numbers are stream indices, "X" means no stream, "s" means synchronization
* ```
* CPU(X) ---- GPU(0) --- GPU(0) -- GPU(0) -- output 0
* \ s
* \ /
* ----- GPU(1) ----
* \
* \
* CPU(X) --- GPU(2) ----s GPU(1) ----------s output 1
* ```
*/
template <>
class StreamAssignment<StreamPolicy::PerOperator> {
public:
explicit StreamAssignment(ExecGraph &graph) {
Assign(graph);
}

std::optional<int> operator[](const ExecNode *node) const {
auto it = node_meta_.find(node);
assert(it != node_meta_.end());
return it->second.stream_id;
}

/** Gets the total number of streams required to run independent operators on separate streams. */
int NumStreams() const {
return total_streams_;
}

private:
struct NodeMeta {
int index; // index in the sorted_nodes_
bool needs_stream = false; // whether this exact node needs a stream
bool gpu_contributor = false; // whether the node contributes to a GPU operator
std::optional<int> stream_id;
std::map<int, int> ready_streams;
};

void Assign(ExecGraph &graph) {
int num_nodes = graph.Nodes().size();

// the nodes in the are already sorted topologically
sorted_nodes_.reserve(num_nodes);
for (auto &node : graph.Nodes()) {
int idx = sorted_nodes_.size();
NodeMeta &meta = node_meta_.insert({ &node, {} }).first->second;
meta.index = idx;
sorted_nodes_.push_back({ &node, &meta });
}

assert(static_cast<size_t>(num_nodes) == sorted_nodes_.size());

FindGPUContributors(graph);
RunAssignment(graph);
ClearCPUStreams();
}


void ClearCPUStreams() {
for (auto &[node, meta] : node_meta_)
if (!meta.needs_stream)
meta.stream_id = std::nullopt;
}


void FindGPUContributors(ExecGraph &graph) {
// Run DFS, output to input, and find nodes which contribute to any node that requires a stream
graph::ClearVisitMarkers(graph.Nodes());
for (auto it = graph.Nodes().rbegin(); it != graph.Nodes().rend(); ++it) {
auto &node = *it;
FindGPUContributors(&node, false);
}
}

void FindGPUContributors(const ExecNode *node, bool is_gpu_contributor) {
graph::Visit v(node);
if (!v)
return;
auto &meta = node_meta_[node];
meta.needs_stream = NeedsStream(node);
if (!is_gpu_contributor)
is_gpu_contributor = meta.needs_stream;
if (is_gpu_contributor)
meta.gpu_contributor = true;
for (auto *inp : node->inputs)
FindGPUContributors(inp->producer, is_gpu_contributor);
}

void RunAssignment(ExecGraph &graph) {
// Process the nodes in topological order.
for (auto [node, meta] : sorted_nodes_) {
ProcessNode(node, meta);
}
}

void ProcessNode(const ExecNode *node, NodeMeta *meta) {
/* The algorithm
Each node has an associated NodeMeta, which contains the stream assignment and a set
of "ready streams". This is the set of streams which are ready after this node is complete.
It includes the streams of the producers + the stream of this node.
Each stream is associated with a "use count" and a ready set contains, alongside the id,
the value of the use count at the time at which the stream was inserted to the set.
Later, when trying to reuse the ready streams, the streams which were inserted with an
outdated use count are rejected.
For each node (topologically sorted):
1a. Pick the producers' ready stream with ths smallest id (reject streams with stale use count).
1b. If there are no ready streams, bump the total number of streams and get a new stream id.
2. Bump the use count of the currently assigned stream.
3. Compute the current ready set as the union of input ready sets + the current stream id.
When computing the union, stale streams are removed to speed up subsequent lookups.
*/

std::optional<int> stream_id = {};

for (auto &e : node->inputs) {
auto &prod_meta = node_meta_[e->producer];
// If we're a GPU contributor (and therefore we need a stream assignment), check if the
// producer's ready stream set contains something.
if (meta->gpu_contributor) {
for (auto it = prod_meta.ready_streams.begin(); it != prod_meta.ready_streams.end(); ++it) {
auto [id, use_count] = *it;
if (use_count < stream_use_count_[id]) {
continue;
}
if (!stream_id.has_value() || *stream_id > id)
stream_id = id;
}
}
// Add producer's ready set to the current one.
CombineReady(*meta, prod_meta);
}

if (stream_id.has_value()) {
UseStream(*meta, *stream_id);
} else {
if (meta->needs_stream) {
stream_id = total_streams_++;
stream_use_count_[*stream_id] = 1;
UseStream(*meta, *stream_id);
}
}

assert(!stream_id.has_value() || meta->ready_streams.count(*stream_id));
}

void UseStream(NodeMeta &meta, int id) {
int use_count = ++stream_use_count_[id];
meta.stream_id = id;
meta.ready_streams[id] = use_count;
}

void CombineReady(NodeMeta &to, NodeMeta &from) {
for (auto it = from.ready_streams.begin(); it != from.ready_streams.end(); ) {
auto [id, old_use_count] = *it;
int current_use_count = stream_use_count_[id];
if (current_use_count > old_use_count) {
it = from.ready_streams.erase(it);
continue;
}
to.ready_streams[id] = current_use_count;
++it;
}
}

int total_streams_ = 0;

std::unordered_map<const ExecNode *, NodeMeta> node_meta_;
std::unordered_map<int, int> stream_use_count_;
std::vector<std::pair<const ExecNode *, NodeMeta *>> sorted_nodes_;
};

} // namespace exec2
} // namespace dali

Expand Down
Loading

0 comments on commit 2117f88

Please sign in to comment.