Skip to content

Commit

Permalink
Relax comparison instead of disabling it
Browse files Browse the repository at this point in the history
  • Loading branch information
rmccorm4 committed Jul 12, 2024
1 parent ff64478 commit 2919536
Showing 1 changed file with 102 additions and 109 deletions.
211 changes: 102 additions & 109 deletions src/test/async_work_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,115 +91,108 @@ TEST_F(AsyncWorkQueueTest, WorkerCountInitialized)
<< "Expect 4 worker count for initialized queue";
}

// FIXME: The AsyncWorkQueue parallelism settings don't currently work as
// intended and cause this subtest to be flaky when expecting parallel speedup.
// There is currently no plan to fix this in the near term, as we don't rely
// on this feature for core performance.
//
// TEST_F(AsyncWorkQueueTest, RunTasksInParallel)
//{
// auto AddTwoFn = [](const std::vector<int>& lhs, const std::vector<int>& rhs,
// std::promise<std::vector<int>>* res) {
// std::vector<int> lres;
// lres.reserve(lhs.size());
// for (size_t idx = 0; idx < lhs.size(); idx++) {
// lres.push_back(lhs[idx] + rhs[idx]);
// }
// res->set_value(lres);
// };
//
// size_t task_count = 8;
// std::vector<std::vector<int>> operands;
// std::vector<std::vector<int>> expected_results;
// {
// // Use large element count to reduce the async work queue overhead
// size_t element_count = 1 << 24;
// auto RandHalfIntFn = std::bind(
// std::uniform_int_distribution<>{
// std::numeric_limits<int>::min() / 2,
// std::numeric_limits<int>::max() / 2},
// std::default_random_engine{});
// for (size_t tc = 0; tc < task_count + 1; tc++) {
// expected_results.push_back(std::vector<int>());
// operands.push_back(std::vector<int>());
// operands.back().reserve(element_count);
// for (size_t ec = 0; ec < element_count; ec++) {
// operands.back().push_back(RandHalfIntFn());
// }
// }
// }
//
// // Get serialized time as baseline and store expected results
// uint64_t serialized_duration = 0;
// {
// std::vector<std::promise<std::vector<int>>> res(task_count);
//
// auto start_ts =
// std::chrono::duration_cast<std::chrono::nanoseconds>(
// std::chrono::high_resolution_clock::now().time_since_epoch())
// .count();
//
// for (size_t count = 0; count < task_count; count++) {
// AddTwoFn(operands[count], operands[count + 1], &res[count]);
// }
//
// auto end_ts =
// std::chrono::duration_cast<std::chrono::nanoseconds>(
// std::chrono::high_resolution_clock::now().time_since_epoch())
// .count();
//
// for (size_t count = 0; count < task_count; count++) {
// expected_results[count] = std::move(res[count].get_future().get());
// }
// serialized_duration = end_ts - start_ts;
// }
//
// auto error = tc::AsyncWorkQueue::Initialize(4);
// ASSERT_TRUE(error.IsOk()) << error.Message();
//
// uint64_t parallelized_duration = 0;
// {
// std::vector<std::promise<std::vector<int>>> ps(task_count);
// std::vector<std::future<std::vector<int>>> fs;
// for (auto& p : ps) {
// fs.emplace_back(std::move(p.get_future()));
// }
//
// auto start_ts =
// std::chrono::duration_cast<std::chrono::nanoseconds>(
// std::chrono::high_resolution_clock::now().time_since_epoch())
// .count();
//
// for (size_t count = 0; count < task_count; count++) {
// tc::AsyncWorkQueue::AddTask([&AddTwoFn, &operands, &ps, count]() mutable
// {
// AddTwoFn(operands[count], operands[count + 1], &ps[count]);
// });
// }
// for (size_t count = 0; count < task_count; count++) {
// fs[count].wait();
// }
//
// auto end_ts =
// std::chrono::duration_cast<std::chrono::nanoseconds>(
// std::chrono::high_resolution_clock::now().time_since_epoch())
// .count();
//
// parallelized_duration = end_ts - start_ts;
// // FIXME manual testing shows parallelized time is between 30% to 33.3%
// for
// // 128 M total elements
// EXPECT_LT(parallelized_duration, serialized_duration / 3)
// << "Expected parallelized work was completed within 1/3 of serialized
// "
// "time";
// for (size_t count = 0; count < task_count; count++) {
// auto res = std::move(fs[count].get());
// EXPECT_EQ(res, expected_results[count])
// << "Mismatched parallelized result";
// }
// }
//}
TEST_F(AsyncWorkQueueTest, RunTasksInParallel)
{
auto AddTwoFn = [](const std::vector<int>& lhs, const std::vector<int>& rhs,
std::promise<std::vector<int>>* res) {
std::vector<int> lres;
lres.reserve(lhs.size());
for (size_t idx = 0; idx < lhs.size(); idx++) {
lres.push_back(lhs[idx] + rhs[idx]);
}
res->set_value(lres);
};

size_t task_count = 8;
std::vector<std::vector<int>> operands;
std::vector<std::vector<int>> expected_results;
{
// Use large element count to reduce the async work queue overhead
size_t element_count = 1 << 24;
auto RandHalfIntFn = std::bind(
std::uniform_int_distribution<>{
std::numeric_limits<int>::min() / 2,
std::numeric_limits<int>::max() / 2},
std::default_random_engine{});
for (size_t tc = 0; tc < task_count + 1; tc++) {
expected_results.push_back(std::vector<int>());
operands.push_back(std::vector<int>());
operands.back().reserve(element_count);
for (size_t ec = 0; ec < element_count; ec++) {
operands.back().push_back(RandHalfIntFn());
}
}
}

// Get serialized time as baseline and store expected results
uint64_t serialized_duration = 0;
{
std::vector<std::promise<std::vector<int>>> res(task_count);

auto start_ts =
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();

for (size_t count = 0; count < task_count; count++) {
AddTwoFn(operands[count], operands[count + 1], &res[count]);
}

auto end_ts =
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();

for (size_t count = 0; count < task_count; count++) {
expected_results[count] = std::move(res[count].get_future().get());
}
serialized_duration = end_ts - start_ts;
}

auto error = tc::AsyncWorkQueue::Initialize(4);
ASSERT_TRUE(error.IsOk()) << error.Message();

uint64_t parallelized_duration = 0;
{
std::vector<std::promise<std::vector<int>>> ps(task_count);
std::vector<std::future<std::vector<int>>> fs;
for (auto& p : ps) {
fs.emplace_back(std::move(p.get_future()));
}

auto start_ts =
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();

for (size_t count = 0; count < task_count; count++) {
tc::AsyncWorkQueue::AddTask([&AddTwoFn, &operands, &ps, count]() mutable {
AddTwoFn(operands[count], operands[count + 1], &ps[count]);
});
}
for (size_t count = 0; count < task_count; count++) {
fs[count].wait();
}

auto end_ts =
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();

parallelized_duration = end_ts - start_ts;
// FIXME manual testing shows parallelized time is between 30% to 33.3%
// for 128 M total elements, but is flaky in CI so relax the comparison
// for parallel to simply be faster than serial by any amount.
EXPECT_LT(parallelized_duration, serialized_duration)
<< "Expected parallelized work was completed faster than serialized "
"time";
for (size_t count = 0; count < task_count; count++) {
auto res = std::move(fs[count].get());
EXPECT_EQ(res, expected_results[count])
<< "Mismatched parallelized result";
}
}
}

TEST_F(AsyncWorkQueueTest, RunTasksFIFO)
{
Expand Down

0 comments on commit 2919536

Please sign in to comment.