Skip to content

Commit

Permalink
[SYCL][Graph] Avoid unnecessary inter-partition dependencies
Browse files Browse the repository at this point in the history
Improves management of inter-partition dependencies, so that only required dependencies are added.
As removing these dependencies can results in multiple executions paths, we have added a map to track all events returned from submitted partitions.
All these events are linked to the main event returned to user.
Adds tests.
  • Loading branch information
mfrancepillois committed Feb 9, 2024
1 parent 3c39d13 commit ddfd622
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 2 deletions.
26 changes: 26 additions & 0 deletions sycl/doc/design/CommandGraph.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,32 @@ illustrated in the following diagrams:
![Graph partition illustration step 10b.](images/SYCL-Graph-partitions_step11.jpg)
![Graph partition illustration step 11b.](images/SYCL-Graph-partitions_step12.jpg)

### Multiple Roots Execution Flow
The following diagram shows the partitions of a graph with two roots
and a host-task in each branch.

![Multiple roots graph partition illustration.](images/SYCL-Graph-multiple_roots_partitions.jpg)

When executing this graph, the partitions were enqueued one after the other,
with each partition waiting for the previous one to complete
(see top of the following diagram).
However, for multi-root graph, this behavior adds unnecessary dependency
between partitions, slowing down the execution of the whole graph.
Now, we keep track of the actual predecessors of each partition and
only enforce dependencies between partitions when necessary.
In our example, the extra dependency is therefore removed and
both branches can be executed concurrently.
But as we can see on this diagram, this new approach can involve
multiple execution tails, which leads to difficulties when
we want to know when the graph execution has finished.
To cope with this issue, the events associated to the completion of
each partition are linked to the event returned to users.
Hence, when the returned event is complete, we can guarantee that
all works associated with the graph has been completed.

![Multiple roots graph partition execution flow.](images/SYCL-Graph-partition_execution_flow.jpg)


## Memory handling: Buffer and Accessor

There is no extra support for graph-specific USM allocations in the current
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 36 additions & 2 deletions sycl/source/detail/graph_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,18 @@ void exec_graph_impl::makePartitions() {
Partition->MSchedule.end());
}

// Compute partition dependencies
for (auto Partition : MPartitions) {
for (auto const &Root : Partition->MRoots) {
auto RootNode = Root.lock();
for (const auto &Dep : RootNode->MPredecessors) {
auto NodeDep = Dep.lock();
Partition->MPredecessors.push_back(
MPartitions[MPartitionNodes[NodeDep]]);
}
}
}

// Reset node groups (if node have to be re-processed - e.g. subgraph)
for (auto &Node : MGraphImpl->MNodeStorage) {
Node->MPartitionNum = -1;
Expand Down Expand Up @@ -761,7 +773,22 @@ exec_graph_impl::enqueue(const std::shared_ptr<sycl::detail::queue_impl> &Queue,
});

sycl::detail::EventImplPtr NewEvent;
for (auto CurrentPartition : MPartitions) {
std::vector<sycl::detail::EventImplPtr> BackupCGDataMEvents;
if (MPartitions.size() > 1) {
BackupCGDataMEvents = CGData.MEvents;
}
for (uint32_t currentPartitionsNum = 0;
currentPartitionsNum < MPartitions.size(); currentPartitionsNum++) {
auto CurrentPartition = MPartitions[currentPartitionsNum];
// restore initial MEvents to add only needed additional depenencies
if (currentPartitionsNum > 0) {
CGData.MEvents = BackupCGDataMEvents;
}

for (auto const &DepPartition : CurrentPartition->MPredecessors) {
CGData.MEvents.push_back(MPartitionsExecutionEvents[DepPartition]);
}

auto CommandBuffer =
CurrentPartition->MPiCommandBuffers[Queue->get_device()];

Expand Down Expand Up @@ -901,12 +928,19 @@ exec_graph_impl::enqueue(const std::shared_ptr<sycl::detail::queue_impl> &Queue,
NewEvent->setStateIncomplete();
NewEvent->getPreparedDepsEvents() = ScheduledEvents;
}
CGData.MEvents.push_back(NewEvent);
MPartitionsExecutionEvents[CurrentPartition] = NewEvent;
}

// Keep track of this execution event so we can make sure it's completed in
// the destructor.
MExecutionEvents.push_back(NewEvent);
// Attach events of previous partitions to ensure that when the returned event
// is complete all execution associated with the graph have been completed.
for (auto const &Elem : MPartitionsExecutionEvents) {
if (Elem.second != NewEvent) {
NewEvent->attachEventToComplete(Elem.second);
}
}
sycl::event QueueEvent =
sycl::detail::createSyclObjFromImpl<sycl::event>(NewEvent);
return QueueEvent;
Expand Down
5 changes: 5 additions & 0 deletions sycl/source/detail/graph_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ class partition {
/// Map of devices to command buffers.
std::unordered_map<sycl::device, sycl::detail::pi::PiExtCommandBuffer>
MPiCommandBuffers;
/// List of predecessors to this partition.
std::vector<std::shared_ptr<partition>> MPredecessors;

/// @return True if the partition contains a host task
bool isHostTask() const {
Expand Down Expand Up @@ -1072,6 +1074,9 @@ class exec_graph_impl {
std::vector<sycl::detail::EventImplPtr> MExecutionEvents;
/// List of the partitions that compose the exec graph.
std::vector<std::shared_ptr<partition>> MPartitions;
/// Map of the partitions to their execution events
std::unordered_map<std::shared_ptr<partition>, sycl::detail::EventImplPtr>
MPartitionsExecutionEvents;
};

} // namespace detail
Expand Down
9 changes: 9 additions & 0 deletions sycl/test-e2e/Graph/Explicit/host_task2_multiple_roots.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// RUN: %{build} -o %t.out
// RUN: %{run} %t.out
// Extra run to check for leaks in Level Zero using UR_L0_LEAKS_DEBUG
// RUN: %if level_zero %{env UR_L0_LEAKS_DEBUG=1 %{run} %t.out 2>&1 | FileCheck %s --implicit-check-not=LEAK %}
//

#define GRAPH_E2E_EXPLICIT

#include "../Inputs/host_task2_multiple_roots.cpp"
9 changes: 9 additions & 0 deletions sycl/test-e2e/Graph/Explicit/host_task_multiple_roots.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// RUN: %{build} -o %t.out
// RUN: %{run} %t.out
// Extra run to check for leaks in Level Zero using UR_L0_LEAKS_DEBUG
// RUN: %if level_zero %{env UR_L0_LEAKS_DEBUG=1 %{run} %t.out 2>&1 | FileCheck %s --implicit-check-not=LEAK %}
//

#define GRAPH_E2E_EXPLICIT

#include "../Inputs/host_task_multiple_roots.cpp"
174 changes: 174 additions & 0 deletions sycl/test-e2e/Graph/Inputs/host_task2_multiple_roots.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// This test uses a host_task when adding a command_graph node for graph with
// multiple roots.

#include "../graph_common.hpp"

int main() {
queue Queue{{sycl::ext::intel::property::queue::no_immediate_command_list{}}};

if (!are_graphs_supported(Queue)) {
return 0;
}

using T = int;

if (!Queue.get_device().has(sycl::aspect::usm_shared_allocations)) {
return 0;
}

const T ModValue = T{7};
std::vector<T> DataA(Size), DataB(Size), DataC(Size), Res2(Size);

std::iota(DataA.begin(), DataA.end(), 1);
std::iota(DataB.begin(), DataB.end(), 10);
std::iota(DataC.begin(), DataC.end(), 1000);

std::vector<T> Reference(DataC);
for (unsigned n = 0; n < Iterations; n++) {
for (size_t i = 0; i < Size; i++) {
Reference[i] = (((DataA[i] + DataB[i]) * ModValue) + 1) * DataB[i];
}
}

exp_ext::command_graph Graph{Queue.get_context(), Queue.get_device()};

T *PtrA = malloc_device<T>(Size, Queue);
T *PtrB = malloc_device<T>(Size, Queue);
T *PtrC = malloc_shared<T>(Size, Queue);
T *PtrA2 = malloc_device<T>(Size, Queue);
T *PtrB2 = malloc_device<T>(Size, Queue);
T *PtrC2 = malloc_shared<T>(Size, Queue);

Queue.copy(DataA.data(), PtrA, Size);
Queue.copy(DataB.data(), PtrB, Size);
Queue.copy(DataC.data(), PtrC, Size);
Queue.copy(DataA.data(), PtrA2, Size);
Queue.copy(DataB.data(), PtrB2, Size);
Queue.copy(DataC.data(), PtrC2, Size);
Queue.wait_and_throw();

// Vector add to output
auto NodeA = add_node(Graph, Queue, [&](handler &CGH) {
CGH.parallel_for(range<1>(Size), [=](item<1> id) { PtrC[id] = PtrA[id]; });
});

// Vector add to output
auto NodeB = add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeA);
CGH.parallel_for(range<1>(Size),
[=](item<1> id) { PtrC[id] += PtrB[id]; });
},
NodeA);

// Modify the output values in a host_task
auto NodeC = add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeB);
CGH.host_task([=]() {
for (size_t i = 0; i < Size; i++) {
PtrC[i] *= ModValue;
}
});
},
NodeB);

// Modify temp buffer and write to output buffer
auto NodeD = add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeC);
CGH.parallel_for(range<1>(Size), [=](item<1> id) { PtrC[id] += 1; });
},
NodeC);

// Modify temp buffer and write to output buffer
add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeD);
CGH.parallel_for(range<1>(Size),
[=](item<1> id) { PtrC[id] *= PtrB[id]; });
},
NodeD);

// Vector add to output
auto NodeA2 = add_node(Graph, Queue, [&](handler &CGH) {
CGH.parallel_for(range<1>(Size),
[=](item<1> id) { PtrC2[id] = PtrA2[id]; });
});

// Vector add to output
auto NodeB2 = add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeA2);
CGH.parallel_for(range<1>(Size),
[=](item<1> id) { PtrC2[id] += PtrB2[id]; });
},
NodeA2);

// Modify the output values in a host_task
auto NodeC2 = add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeB2);
CGH.host_task([=]() {
for (size_t i = 0; i < Size; i++) {
PtrC2[i] *= ModValue;
}
});
},
NodeB2);

// Modify temp buffer and write to output buffer
auto NodeD2 = add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeC2);
CGH.parallel_for(range<1>(Size), [=](item<1> id) { PtrC2[id] += 1; });
},
NodeC2);

// Modify temp buffer and write to output buffer
add_node(
Graph, Queue,
[&](handler &CGH) {
depends_on_helper(CGH, NodeD2);
CGH.parallel_for(range<1>(Size),
[=](item<1> id) { PtrC2[id] *= PtrB2[id]; });
},
NodeD2);

auto GraphExec = Graph.finalize();

event Event;
for (unsigned n = 0; n < Iterations; n++) {
Event = Queue.submit([&](handler &CGH) {
CGH.depends_on(Event);
CGH.ext_oneapi_graph(GraphExec);
});
Event.wait();
}
Queue.wait_and_throw();

Queue.copy(PtrC, DataC.data(), Size);
Queue.copy(PtrC2, Res2.data(), Size);
Queue.wait_and_throw();

free(PtrA, Queue);
free(PtrB, Queue);
free(PtrC, Queue);
free(PtrA2, Queue);
free(PtrB2, Queue);
free(PtrC2, Queue);

for (size_t i = 0; i < Size; i++) {
assert(check_value(i, Reference[i], DataC[i], "DataC"));
assert(check_value(i, Reference[i], Res2[i], "Res2"));
}

return 0;
}
Loading

0 comments on commit ddfd622

Please sign in to comment.