Skip to content

Commit

Permalink
Fix pinnedness.
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
  • Loading branch information
mzient committed Jul 18, 2024
1 parent 3dfbac0 commit c46577e
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 31 deletions.
7 changes: 6 additions & 1 deletion dali/pipeline/data/buffer.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,6 +64,11 @@ DLL_PUBLIC shared_ptr<uint8_t> AllocBuffer(size_t bytes, bool pinned,

DLL_PUBLIC bool RestrictPinnedMemUsage() {
static bool val = []() {
/* int n = 0;
if (cudaGetDeviceCount(&n) != CUDA_SUCCESS)
return true;
if (n == 0)
return true; */
const char *env = getenv("DALI_RESTRICT_PINNED_MEM");
return env && atoi(env);
}();
Expand Down
8 changes: 4 additions & 4 deletions dali/pipeline/executor/executor2/exec_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void ExecNode::CreateAuxTasks() {
release_outputs = Task::Create([]() {});
release_outputs->ReleaseAfterRun(output_queue_limit);
release_outputs->Succeed(main_task);
for (auto &consumers : outputs) {
for (auto *edge : consumers) {
for (auto &output : outputs) {
for (auto *edge : output.consumers) {
if (edge->consumer->main_task)
release_outputs->Succeed(edge->consumer->main_task);
}
Expand Down Expand Up @@ -215,7 +215,7 @@ void ExecGraph::Validate() {

if (e.producer_output_idx >= static_cast<int>(e.producer->outputs.size()))
err("producer output index is out of range.");
auto &consumer_edges = e.producer->outputs[e.producer_output_idx];
auto &consumer_edges = e.producer->outputs[e.producer_output_idx].consumers;
if (std::count(consumer_edges.begin(), consumer_edges.end(), &e) != 1)
err("the relevant producer's output doesn't have this edge as one of the consumers.");

Expand All @@ -233,7 +233,7 @@ void ExecGraph::Validate() {
}

for (int o = 0, nout = n.outputs.size(); o < nout; o++) {
auto &consumers = n.outputs[o];
auto &consumers = n.outputs[o].consumers;
for (auto &e : consumers) {
if (!known_edges.count(e))
err("a node's output is not a known edge pointer.");
Expand Down
13 changes: 11 additions & 2 deletions dali/pipeline/executor/executor2/exec_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ struct ExecEdge {
/** A tag type for constructing output ExecNode */
struct PipelineOutputTag {};

struct ExecOutputDesc {
SmallVector<ExecEdge *, 4> consumers;

StorageDevice device = StorageDevice::CPU;
bool pinned = false;
};

/** An execution node.
*
* An execution node corresponds to an operator node or an output node in the pipeline
Expand Down Expand Up @@ -119,7 +126,7 @@ class DLL_PUBLIC ExecNode {
* The outputs must appear in the same order as they're defined in the operator's OpSpec.
* The order of consumer edges in each output is not important.
*/
SmallVector<SmallVector<ExecEdge *, 4>, 4> outputs;
SmallVector<ExecOutputDesc, 4> outputs;

/** A semaphore limiting the cuncurrency of the operator.
*
Expand Down Expand Up @@ -303,7 +310,7 @@ class DLL_PUBLIC ExecGraph {

if (producer) {
producer->outputs.resize(std::max<size_t>(producer->outputs.size(), out_idx + 1));
producer->outputs[out_idx].push_back(&edge);
producer->outputs[out_idx].consumers.push_back(&edge);
}
if (consumer) {
consumer->inputs.resize(std::max<size_t>(consumer->inputs.size(), in_idx + 1));
Expand All @@ -320,6 +327,8 @@ class DLL_PUBLIC ExecGraph {
/** Populates the graph based on a pipeline definiton graph. */
void Lower(const graph::OpGraph &def);

void FindPinnedBuffers();

private:
std::list<ExecNode> nodes_;
std::list<ExecEdge> edges_;
Expand Down
73 changes: 72 additions & 1 deletion dali/pipeline/executor/executor2/exec_graph_lowering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ void ExecGraph::Lower(const graph::OpGraph &def) {
for (ExecNode &exec_node : nodes_) {
assert(it_def != def.OpNodes().end());
auto op_node = *it_def++;
assert(exec_node.outputs.size() == op_node.outputs.size());
for (int o = 0, nout = op_node.outputs.size(); o < nout; o++) {
const auto &out = op_node.outputs[o];
auto dev = out->device;
for (auto &consumer : out->consumers) {
auto *exec_con = def2exec[consumer.op];
assert(exec_con != nullptr);
Link(&exec_node, o, exec_con, consumer.idx);
Link(&exec_node, o, exec_con, consumer.idx)->device = dev;
}
exec_node.outputs[o].device = dev;
}
}

Expand All @@ -56,8 +59,76 @@ void ExecGraph::Lower(const graph::OpGraph &def) {
edge->device = data_node->device;
}

FindPinnedBuffers();
Validate();
}

namespace {

/** Sets pinnedness of the input sources
*
* The function goes over the inputs of the node. If the node is non-CPU, then all of its
* CPU _regular_ inputs are marked as pinned.
* If the node is a CPU node but passes through an input `i` directly to a pinned output `o`,
* then the source of input `i` is also marked as pinned.
*/
void SetPinnedInputs(ExecNode *node) {
assert(node->op != nullptr);

// TODO(michalz): Update if/when we have passthrough for argument inputs
int ninp = node->op->GetSpec().NumRegularInput();
assert(static_cast<size_t>(ninp) <= node->inputs.size());

if (node->backend != OpType::CPU) {
for (int i = 0; i < ninp; i++) {
auto *inp = node->inputs[i];
inp->producer->outputs[inp->producer_output_idx].pinned = true;
}
} else if (node->op->GetSpec().GetSchema().HasPassThrough()) {
auto &schema = node->op->GetSpec().GetSchema();
int nout = node->outputs.size();
for (int i = 0; i < ninp; i++) {
auto *input = node->inputs[i];
if (input->device != StorageDevice::CPU) // we're not interested in non-CPU buffers
continue;

auto &source_output = input->producer->outputs[input->producer_output_idx];
if (source_output.pinned) // already pinned
continue;

for (int o = 0; o < nout; o++) {
// If input `i` passes to a pinned output `o`, then the input should also be marked
// as pinned. This will be followed in reverse topological order.
if (node->outputs[o].pinned && schema.IsPassThrough(i, o, false)) {
source_output.pinned = true;
break;
}
}
}
}
}

} // namespace

void ExecGraph::FindPinnedBuffers() {
// No non-cpu ops? Just mark everything as non-pinned and we're done.
auto is_gpu_edge = [](const ExecEdge &e) { return e.device == StorageDevice::GPU; };
bool has_gpu_buffers = std::find_if(edges_.begin(), edges_.end(), is_gpu_edge) != edges_.end();
if (!has_gpu_buffers) {
for (auto &n : nodes_)
for (auto &o : n.outputs)
o.pinned = false;
return;
}

// go in reverse topological order, from outputs to inputs
for (auto it = nodes_.rbegin(); it != nodes_.rend(); ++it) {
ExecNode &n = *it;
if (n.is_pipeline_output)
continue;
SetPinnedInputs(&n);
}
}

} // namespace exec2
} // namespace dali
41 changes: 21 additions & 20 deletions dali/pipeline/executor/executor2/exec_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,15 @@ TEST(ExecGraphTest, Exception) {
.AddOutput("op2e0", "cpu")
.AddArg("name", "op2");
auto op2 = std::make_unique<DummyOpCPU>(spec2);
ExecGraph def;
ExecNode *n2 = def.AddNode(std::move(op2));
ExecNode *n1 = def.AddNode(std::move(op1));
ExecNode *n0 = def.AddNode(std::move(op0));
ExecNode *no = def.AddOutputNode();
def.Link(n0, 0, n2, 0);
def.Link(n1, 0, n2, 1);
def.Link(n2, 0, no, 0);
ExecGraph g;
ExecNode *n2 = g.AddNode(std::move(op2));
ExecNode *n1 = g.AddNode(std::move(op1));
ExecNode *n0 = g.AddNode(std::move(op0));
ExecNode *no = g.AddOutputNode();
g.Link(n0, 0, n2, 0);
g.Link(n1, 0, n2, 1);
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);
ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
Expand All @@ -276,8 +277,8 @@ TEST(ExecGraphTest, Exception) {
tasking::Executor ex(4);
ex.Start();
for (int i = 0; i < 10; i++) {
def.PrepareIteration(std::make_shared<IterationData>(), params);
auto fut = def.Launch(ex);
g.PrepareIteration(std::make_shared<IterationData>(), params);
auto fut = g.Launch(ex);
EXPECT_THROW(fut.Value<const PipelineOutput &>(), DALIException);
}
}
Expand Down Expand Up @@ -313,25 +314,25 @@ TEST(ExecGraphTest, LoweredStructureMatch) {
auto &ex_out = g.Nodes().back();

ASSERT_EQ(ex0.outputs.size(), 1_uz);
ASSERT_EQ(ex0.outputs[0].size(), 2_uz);
EXPECT_EQ(ex0.outputs[0][0]->consumer, &ex2);
EXPECT_EQ(ex0.outputs[0][1]->consumer, &ex3);
ASSERT_EQ(ex0.outputs[0].consumers.size(), 2_uz);
EXPECT_EQ(ex0.outputs[0].consumers[0]->consumer, &ex2);
EXPECT_EQ(ex0.outputs[0].consumers[1]->consumer, &ex3);

ASSERT_EQ(ex1.outputs.size(), 1_uz);
EXPECT_EQ(ex1.outputs[0][0]->consumer, &ex2);
ASSERT_EQ(ex1.outputs[0].size(), 2_uz);
EXPECT_EQ(ex1.outputs[0][1]->consumer, &ex3);
EXPECT_EQ(ex1.outputs[0].consumers[0]->consumer, &ex2);
ASSERT_EQ(ex1.outputs[0].consumers.size(), 2_uz);
EXPECT_EQ(ex1.outputs[0].consumers[1]->consumer, &ex3);

ASSERT_EQ(ex2.outputs.size(), 1_uz);
ASSERT_EQ(ex2.outputs[0].size(), 1_uz);
EXPECT_EQ(ex2.outputs[0][0]->consumer, &ex_out);
ASSERT_EQ(ex2.outputs[0].consumers.size(), 1_uz);
EXPECT_EQ(ex2.outputs[0].consumers[0]->consumer, &ex_out);
ASSERT_EQ(ex2.inputs.size(), 2_uz);
EXPECT_EQ(ex2.inputs[0]->producer, &ex0);
EXPECT_EQ(ex2.inputs[1]->producer, &ex1);

ASSERT_EQ(ex3.outputs.size(), 1_uz);
ASSERT_EQ(ex3.outputs[0].size(), 1_uz);
EXPECT_EQ(ex3.outputs[0][0]->consumer, &ex_out);
ASSERT_EQ(ex3.outputs[0].consumers.size(), 1_uz);
EXPECT_EQ(ex3.outputs[0].consumers[0]->consumer, &ex_out);
EXPECT_EQ(ex3.inputs[0]->producer, &ex0);
EXPECT_EQ(ex3.inputs[1]->producer, &ex1);

Expand Down
3 changes: 2 additions & 1 deletion dali/pipeline/executor/executor2/op_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void OpTask::SetupOp() {
if (ws.OutputIsType<CPUBackend>(i)) {
if (!ws.OutputPtr<CPUBackend>(i)) {
auto tl = std::make_shared<TensorList<CPUBackend>>(output_descs[i].shape.num_samples());
tl->set_pinned(node_->outputs[i].pinned);
ws.SetOutput(i, tl);
}
if (should_resize)
Expand Down Expand Up @@ -190,7 +191,7 @@ void OpTask::SetWorkspaceInputs() {
AccessOrder OpTask::OutputConsumerOrder(int output_idx) {
assert(static_cast<size_t>(output_idx) < node_->outputs.size());
// Return the common strueam.
auto &consumers = node_->outputs[output_idx];
auto &consumers = node_->outputs[output_idx].consumers;
if (consumers.empty())
return {}; // definitely no consumer
AccessOrder order = consumers[0]->consumer->env.order;
Expand Down
4 changes: 2 additions & 2 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ class StreamAssignment<StreamPolicy::PerOperator> {

if (stream_id.has_value())
free_stream_ids_.insert(*stream_id);
for (auto &consumers : node->outputs) {
for (auto *out : consumers) {
for (auto &output_desc : node->outputs) {
for (auto *out : output_desc.consumers) {
auto out_stream_id = NextStreamId(out->consumer, stream_id);
if (out_stream_id.has_value())
keep_stream_id = false;
Expand Down

0 comments on commit c46577e

Please sign in to comment.