Skip to content

Commit

Permalink
Per-operator stream assignment.
Browse files Browse the repository at this point in the history
It works by traversing the graph in depth-first fashion, from outputs
to inputs. Each node has a set of "ready" streams which are merged
when the graph converges. When a stream is reused in a subsequent
node, the stream id is removed from the producer's ready set
and the removal happens recursively, but only once for each stream id.

Typical complexity is O(V+E), pessimistic complexity is O(V+E^2).

Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
  • Loading branch information
mzient committed Sep 4, 2024
1 parent bd304b7 commit 610fd7d
Show file tree
Hide file tree
Showing 2 changed files with 657 additions and 2 deletions.
155 changes: 153 additions & 2 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ enum class StreamPolicy : int {
Single, //< There's just one stream that's used by all operators
PerBackend, //< Operators are scheduled on a stream specific to their backend (mixed or GPU)
PerOperator //< Independent operators are executed on separate streams.

// TODO(michalz): implement minimal assignment for PerOperator policy
};


Expand Down Expand Up @@ -164,6 +162,159 @@ class StreamAssignment<StreamPolicy::PerBackend> {
bool has_mixed_ = false;
};

/** Implements per-operator stream assignment.
*
* This policy implements stream assingment such that independent GPU/Mixed operators get
* separate streams. When there's a dependency then one dependent operator shares the stream of
* its predecessor.
*
* Example - numbers are stream indices, "X" means no stream, "s" means synchronization
* ```
* CPU(X) ---- GPU(0) --- GPU(0) -- GPU(0) -- output 0
* \ s
* \ /
* ----- GPU(1) ----
* \
* \
* CPU(X) --- GPU(2) ----s GPU(1) ----------s output 1
* ```
*/
template <>
class StreamAssignment<StreamPolicy::PerOperator> {
public:
explicit StreamAssignment(ExecGraph &graph) {
Assign(graph);
}

std::optional<int> operator[](const ExecNode *node) const {
auto it = node_meta_.find(node);
assert(it != node_meta_.end());
return it->second.stream_id;
}

/** Gets the total number of streams required to run independent operators on separate streams. */
int NumStreams() const {
return total_streams_;
}

private:
struct NodeMeta {
int index; // index in the sorted_nodes_
bool needs_stream = false; // whether this exact node needs a stream
bool gpu_contributor = false; // whether the node contributes to a GPU operator
std::optional<int> stream_id;
std::set<int> ready_streams;
};

void Assign(ExecGraph &graph) {
int num_nodes = graph.Nodes().size();

// the nodes in the graph must be sorted topologically
sorted_nodes_.reserve(num_nodes);
for (auto &node : graph.Nodes()) {
int idx = sorted_nodes_.size();
NodeMeta &meta = node_meta_.insert({ &node, {} }).first->second;
meta.index = idx;
sorted_nodes_.push_back({ &node, &meta });
}

assert(static_cast<size_t>(num_nodes) == sorted_nodes_.size());

FindGPUContributors(graph);
RunAssignment(graph);
ClearCPUStreams();
}


void ClearCPUStreams() {
for (auto &[node, meta] : node_meta_)
if (!meta.needs_stream)
meta.stream_id = std::nullopt;
}


void FindGPUContributors(ExecGraph &graph) {
// Run DFS, output to input, and find nodes which contribute to any node that requires a stream
graph::ClearVisitMarkers(graph.Nodes());
for (auto it = graph.Nodes().rbegin(); it != graph.Nodes().rend(); ++it) {
auto &node = *it;
FindGPUContributors(&node, false);
}
}

void FindGPUContributors(const ExecNode *node, bool is_gpu_contributor) {
graph::Visit v(node);
if (!v)
return;
auto &meta = node_meta_[node];
meta.needs_stream = NeedsStream(node);
if (!is_gpu_contributor)
is_gpu_contributor = meta.needs_stream;
if (is_gpu_contributor)
meta.gpu_contributor = true;
for (auto *inp : node->inputs)
FindGPUContributors(inp->producer, is_gpu_contributor);
}

void RunAssignment(ExecGraph &graph) {
graph::ClearVisitMarkers(graph.Nodes());
for (int i = sorted_nodes_.size() - 1; i >= 0; i--) {
ProcessNode(sorted_nodes_[i].first, sorted_nodes_[i].second);
}
}

void ProcessNode(const ExecNode *node, NodeMeta *meta) {
graph::Visit v(node);
if (!v)
return;

auto &stream_id = meta->stream_id;
assert(!stream_id.has_value());

for (auto &e : node->inputs) {
auto &prod_meta = node_meta_[e->producer];
ProcessNode(e->producer, &prod_meta);
if (meta->gpu_contributor && !prod_meta.ready_streams.empty()) {
if (!stream_id.has_value() || *stream_id > *prod_meta.ready_streams.begin()) {
stream_id = *prod_meta.ready_streams.begin();
}
}
meta->ready_streams.insert(prod_meta.ready_streams.begin(), prod_meta.ready_streams.end());
}

if (stream_id.has_value()) {
for (auto &e : node->inputs) {
EraseReady(e->producer, *stream_id, node);
}
} else {
if (meta->needs_stream) {
stream_id = total_streams_++;
meta->ready_streams.insert(*stream_id);
}
}

assert(!stream_id.has_value() || meta->ready_streams.count(*stream_id));
}

void EraseReady(const ExecNode *node, int id, const ExecNode *sentinel) {
if (node == sentinel)
return;
auto &meta = node_meta_[node];
if (meta.ready_streams.erase(id)) {
for (auto &e : node->inputs)
EraseReady(e->producer, id, sentinel);
for (auto &out : node->outputs)
for (auto &e : out.consumers)
EraseReady(e->consumer, id, sentinel);
}
}

int total_streams_ = 0;

std::unordered_map<const ExecNode *, NodeMeta> node_meta_;
std::vector<std::pair<const ExecNode *, NodeMeta *>> sorted_nodes_;
};

} // namespace exec2
} // namespace dali

Expand Down
Loading

0 comments on commit 610fd7d

Please sign in to comment.