Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor 2.0: Per-operator stream assignment policy #5620

Merged
merged 4 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 184 additions & 3 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <algorithm>
#include <cassert>
#include <functional>
#include <map>
#include <optional>
#include <queue>
#include <unordered_map>
#include <set>
#include <utility>
#include <vector>
#include "dali/pipeline/graph/graph_util.h"
Expand All @@ -38,8 +38,6 @@ enum class StreamPolicy : int {
Single, //< There's just one stream that's used by all operators
PerBackend, //< Operators are scheduled on a stream specific to their backend (mixed or GPU)
PerOperator //< Independent operators are executed on separate streams.

// TODO(michalz): implement minimal assignment for PerOperator policy
};


Expand Down Expand Up @@ -164,6 +162,189 @@ class StreamAssignment<StreamPolicy::PerBackend> {
bool has_mixed_ = false;
};

/** Implements per-operator stream assignment.
*
* This policy implements stream assingment such that independent GPU/Mixed operators get
* separate streams. When there's a dependency then one dependent operator shares the stream of
* its predecessor.
*
* Example - numbers are stream indices, "X" means no stream, "s" means synchronization
* ```
* CPU(X) ---- GPU(0) --- GPU(0) -- GPU(0) -- output 0
* \ s
* \ /
* ----- GPU(1) ----
* \
* \
* CPU(X) --- GPU(2) ----s GPU(1) ----------s output 1
* ```
*/
template <>
class StreamAssignment<StreamPolicy::PerOperator> {
public:
explicit StreamAssignment(ExecGraph &graph) {
Assign(graph);
}

std::optional<int> operator[](const ExecNode *node) const {
auto it = node_meta_.find(node);
assert(it != node_meta_.end());
return it->second.stream_id;
}

/** Gets the total number of streams required to run independent operators on separate streams. */
int NumStreams() const {
return total_streams_;
}

private:
struct NodeMeta {
int index; // index in the sorted_nodes_
bool needs_stream = false; // whether this exact node needs a stream
bool gpu_contributor = false; // whether the node contributes to a GPU operator
std::optional<int> stream_id;
std::map<int, int> ready_streams;
};

void Assign(ExecGraph &graph) {
int num_nodes = graph.Nodes().size();

// the nodes in the are already sorted topologically
sorted_nodes_.reserve(num_nodes);
for (auto &node : graph.Nodes()) {
int idx = sorted_nodes_.size();
NodeMeta &meta = node_meta_.insert({ &node, {} }).first->second;
meta.index = idx;
sorted_nodes_.push_back({ &node, &meta });
}

assert(static_cast<size_t>(num_nodes) == sorted_nodes_.size());

FindGPUContributors(graph);
RunAssignment(graph);
ClearCPUStreams();
}


void ClearCPUStreams() {
for (auto &[node, meta] : node_meta_)
if (!meta.needs_stream)
meta.stream_id = std::nullopt;
}


void FindGPUContributors(ExecGraph &graph) {
// Run DFS, output to input, and find nodes which contribute to any node that requires a stream
graph::ClearVisitMarkers(graph.Nodes());
for (auto it = graph.Nodes().rbegin(); it != graph.Nodes().rend(); ++it) {
auto &node = *it;
FindGPUContributors(&node, false);
}
}

void FindGPUContributors(const ExecNode *node, bool is_gpu_contributor) {
graph::Visit v(node);
if (!v)
return;
auto &meta = node_meta_[node];
meta.needs_stream = NeedsStream(node);
if (!is_gpu_contributor)
is_gpu_contributor = meta.needs_stream;
if (is_gpu_contributor)
meta.gpu_contributor = true;
for (auto *inp : node->inputs)
FindGPUContributors(inp->producer, is_gpu_contributor);
}

void RunAssignment(ExecGraph &graph) {
// Process the nodes in topological order.
for (auto [node, meta] : sorted_nodes_) {
ProcessNode(node, meta);
}
}

void ProcessNode(const ExecNode *node, NodeMeta *meta) {
/* The algorithm
szkarpinski marked this conversation as resolved.
Show resolved Hide resolved

Each node has an associated NodeMeta, which contains the stream assignment and a set
of "ready streams". This is the set of streams which are ready after this node is complete.
It includes the streams of the producers + the stream of this node.
Each stream is associated with a "use count" and a ready set contains, alongside the id,
the value of the use count at the time at which the stream was inserted to the set.
Later, when trying to reuse the ready streams, the streams which were inserted with an
outdated use count are rejected.

For each node (topologically sorted):

1a. Pick the producers' ready stream with ths smallest id (reject streams with stale use count).
1b. If there are no ready streams, bump the total number of streams and get a new stream id.

2. Bump the use count of the currently assigned stream.

3. Compute the current ready set as the union of input ready sets + the current stream id.

When computing the union, stale streams are removed to speed up subsequent lookups.
*/

std::optional<int> stream_id = {};

for (auto &e : node->inputs) {
auto &prod_meta = node_meta_[e->producer];
// If we're a GPU contributor (and therefore we need a stream assignment), check if the
// producer's ready stream set contains something.
if (meta->gpu_contributor) {
for (auto it = prod_meta.ready_streams.begin(); it != prod_meta.ready_streams.end(); ++it) {
auto [id, use_count] = *it;
if (use_count < stream_use_count_[id]) {
continue;
}
if (!stream_id.has_value() || *stream_id > id)
stream_id = id;
}
}
// Add producer's ready set to the current one.
CombineReady(*meta, prod_meta);
}

if (stream_id.has_value()) {
UseStream(*meta, *stream_id);
} else {
if (meta->needs_stream) {
stream_id = total_streams_++;
stream_use_count_[*stream_id] = 1;
UseStream(*meta, *stream_id);
}
}

assert(!stream_id.has_value() || meta->ready_streams.count(*stream_id));
}

void UseStream(NodeMeta &meta, int id) {
int use_count = ++stream_use_count_[id];
meta.stream_id = id;
meta.ready_streams[id] = use_count;
}

void CombineReady(NodeMeta &to, NodeMeta &from) {
for (auto it = from.ready_streams.begin(); it != from.ready_streams.end(); ) {
auto [id, old_use_count] = *it;
int current_use_count = stream_use_count_[id];
if (current_use_count > old_use_count) {
it = from.ready_streams.erase(it);
continue;
}
to.ready_streams[id] = current_use_count;
++it;
}
}

int total_streams_ = 0;

std::unordered_map<const ExecNode *, NodeMeta> node_meta_;
std::unordered_map<int, int> stream_use_count_;
std::vector<std::pair<const ExecNode *, NodeMeta *>> sorted_nodes_;
};

} // namespace exec2
} // namespace dali

Expand Down
Loading
Loading