Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hipStreamSemantics Fixes #917

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions samples/hipStreamSemantics/hipStreamSemantics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ void callback_sleep2(hipStream_t stream, hipError_t status, void *user_data) {
printf("callback_sleep2: Exiting now\n");
}

void callback_sleep10(hipStream_t stream, hipError_t status, void *user_data) {
int *data = (int *)user_data;
printf("callback_sleep10: Going to sleep for 10sec\n");
sleep(10);
*data = 2;
printf("callback_sleep10: Exiting now\n");
}

/*
* Intent : Verify hipStreamQuery returns right queue status
Expand All @@ -66,7 +59,7 @@ bool TestStreamSemantics_1() {
hipStream_t stream;
CHECK(hipStreamCreate(&stream));
stream2_shared_data = (int *)malloc(sizeof(int));
CHECK(hipStreamAddCallback(stream, callback_sleep10, stream2_shared_data, 0));
CHECK(hipStreamAddCallback(stream, callback_sleep2, stream2_shared_data, 0));
status = hipStreamQuery(stream);
bool testStatus = true;
printf("%s(stream query) : ", __FUNCTION__);
Expand Down Expand Up @@ -108,7 +101,7 @@ bool TestStreamSemantics_2() {
host_ptr = (int *)malloc(size);

// Push a 10sec long taks into the stream
CHECK(hipStreamAddCallback(stream_non_blocking, callback_sleep10,
CHECK(hipStreamAddCallback(stream_non_blocking, callback_sleep2,
stream_shared_data, 0));

// printf("Starting task on null stream\n");
Expand All @@ -121,11 +114,11 @@ bool TestStreamSemantics_2() {

bool testStatus = true;
printf("%s (non-blocking stream): ", __FUNCTION__);
if (*host_ptr == 101 && *stream_shared_data == 2) {
testStatus = false;
if (*host_ptr != 101 && *stream_shared_data != 2) {
printf("%s %s %s\n", "\033[0;31m", "Failed", "\033[0m");
// printf("host_ptr = %d, stream_shared_data = %d\n", *host_ptr,
// *stream_shared_data);fflush(stdout);
printf("host_ptr = %d, stream_shared_data = %d\n", *host_ptr,
*stream_shared_data);
fflush(stdout);
} else {
printf("PASSED\n");
}
Expand Down Expand Up @@ -165,7 +158,7 @@ bool TestStreamSemantics_3() {

*stream2_shared_data = 1;
CHECK(
hipStreamAddCallback(stream2, callback_sleep10, stream2_shared_data, 0));
hipStreamAddCallback(stream2, callback_sleep2, stream2_shared_data, 0));

printf("Going to sync stream1\n");
CHECK(hipStreamSynchronize(stream1));
Expand Down
4 changes: 3 additions & 1 deletion src/CHIPBackend.hh
Original file line number Diff line number Diff line change
Expand Up @@ -2352,7 +2352,9 @@ public:
const std::vector<std::shared_ptr<chipstar::Event>> &EventsToWaitFor) = 0;
virtual std::shared_ptr<chipstar::Event> enqueueBarrier(
const std::vector<std::shared_ptr<chipstar::Event>> &EventsToWaitFor);

virtual std::shared_ptr<chipstar::Event> enqueueBarrier() {
return enqueueBarrier({});
}
virtual std::shared_ptr<chipstar::Event> enqueueMarkerImpl() = 0;
std::shared_ptr<chipstar::Event> enqueueMarker();

Expand Down
8 changes: 5 additions & 3 deletions src/CHIPBindings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2464,7 +2464,9 @@ hipError_t hipDeviceGetStreamPriorityRange(int *LeastPriority,
hipError_t hipStreamDestroy(hipStream_t Stream) {
CHIP_TRY
LOCK(ApiMtx);
LOCK(Backend->EventsMtx);
// Can't be locking this mutex because callback monitor needs to be able to
// set events
// LOCK(Backend->EventsMtx);
CHIPInitialize();
if (Stream == hipStreamPerThread)
CHIPERR_LOG_AND_THROW("Attemped to destroy default per-thread queue",
Expand All @@ -2483,8 +2485,8 @@ hipError_t hipStreamDestroy(hipStream_t Stream) {

chipstar::Device *Dev = Backend->getActiveDevice();

// make sure nothing is pending in the stream
ChipQueue->finish();
// finish all streams. Need to make sure that no Dev->synchronizeQueues();
hipDeviceSynchronizeInternal();

LOCK(Dev->QueueAddRemoveMtx);
if (Dev->removeQueue(ChipQueue))
Expand Down
53 changes: 32 additions & 21 deletions src/backend/Level0/CHIPBackendLevel0.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ CHIPEventLevel0::CHIPEventLevel0(CHIPContextLevel0 *ChipCtx,
};

zeStatus = zeEventPoolCreate(ZeCtx->get(), &EventPoolDesc, 0, nullptr,
&EventPoolHandle_);
&EventPoolHandle_);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeEventPoolCreate);

ze_event_desc_t EventDesc = {
Expand Down Expand Up @@ -349,8 +349,8 @@ void CHIPQueueLevel0::recordEvent(chipstar::Event *ChipEvent) {
addDependenciesQueueSync(TimestampWriteCompleteLz);

zeStatus = zeDeviceGetGlobalTimestamps(ChipDevLz_->get(),
&ChipEventLz->getHostTimestamp(),
&ChipEventLz->getDeviceTimestamp());
&ChipEventLz->getHostTimestamp(),
&ChipEventLz->getDeviceTimestamp());
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeDeviceGetGlobalTimestamps);

Borrowed<FencedCmdList> CommandList = ChipCtxLz_->getCmdListReg();
Expand Down Expand Up @@ -556,9 +556,9 @@ CHIPCallbackDataLevel0::CHIPCallbackDataLevel0(hipStreamCallback_t CallbackF,
auto [QueueSyncEvents, EventLocks] =
ChipQueueLz->addDependenciesQueueSync(GpuReady);
// Add a barrier so that it signals
zeStatus = zeCommandListAppendBarrier(CommandList->getCmdList(),
GpuReadyLz->get(), QueueSyncEvents.size(),
QueueSyncEvents.data());
zeStatus = zeCommandListAppendBarrier(
CommandList->getCmdList(), GpuReadyLz->get(), QueueSyncEvents.size(),
QueueSyncEvents.data());
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeCommandListAppendBarrier);

// This will get triggered manually
Expand All @@ -576,7 +576,16 @@ CHIPCallbackDataLevel0::CHIPCallbackDataLevel0(hipStreamCallback_t CallbackF,
&CpuCallbackCompleteLz->get());
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeCommandListAppendBarrier);

ChipQueueLz->executeCommandList(CommandList, GpuAck);
// Need to create another event as GpuAck will be destroyed once callback is
// complete
auto CallbackComplete = BackendLz->createEventShared(ChipContextLz);
CallbackComplete->Msg = "CallbackComplete";
auto CallbackCompleteLz =
std::static_pointer_cast<CHIPEventLevel0>(CallbackComplete);
zeStatus = zeCommandListAppendBarrier(CommandList->getCmdList(),
CallbackCompleteLz->get(), 0, nullptr);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeCommandListAppendBarrier);
ChipQueueLz->executeCommandList(CommandList, CallbackComplete);
}

// End CHIPCallbackDataLevel0
Expand Down Expand Up @@ -993,7 +1002,7 @@ CHIPQueueLevel0::CHIPQueueLevel0(CHIPDeviceLevel0 *ChipDev,

void CHIPQueueLevel0::initializeCmdListImm() {
zeStatus = zeCommandListCreateImmediate(ZeCtx_, ZeDev_, &QueueDescriptor_,
&ZeCmdListImm_);
&ZeCmdListImm_);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeCommandListCreateImmediate);
}

Expand All @@ -1002,7 +1011,7 @@ void CHIPDeviceLevel0::initializeQueueGroupProperties() {
// Discover the number of command queues
uint32_t CmdqueueGroupCount = 0;
zeStatus = zeDeviceGetCommandQueueGroupProperties(ZeDev_, &CmdqueueGroupCount,
nullptr);
nullptr);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeDeviceGetCommandQueueGroupProperties);
logTrace("CommandGroups found: {}", CmdqueueGroupCount);

Expand Down Expand Up @@ -1155,9 +1164,9 @@ CHIPQueueLevel0::launchImpl(chipstar::ExecItem *ExecItem) {
if (!LzDev->hasOnDemandPaging()) {
// The baseline answer is yes (unless we would know that the
// kernel won't access buffers indirectly).
zeStatus = zeKernelSetIndirectAccess(KernelZe,
ZE_KERNEL_INDIRECT_ACCESS_FLAG_DEVICE |
ZE_KERNEL_INDIRECT_ACCESS_FLAG_HOST);
zeStatus = zeKernelSetIndirectAccess(
KernelZe, ZE_KERNEL_INDIRECT_ACCESS_FLAG_DEVICE |
ZE_KERNEL_INDIRECT_ACCESS_FLAG_HOST);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeKernelSetIndirectAccess);
}

Expand Down Expand Up @@ -1419,7 +1428,7 @@ std::shared_ptr<chipstar::Event> CHIPQueueLevel0::enqueueBarrierImpl(
// simultaneous threads with the same command list handle.
// Done via LOCK(CommandListMtx)
zeStatus = zeCommandListAppendBarrier(CommandList, SignalEventHandle,
NumEventsToWaitFor, EventHandles);
NumEventsToWaitFor, EventHandles);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeCommandListAppendBarrier);
executeCommandList(CommandList, BarrierEvent);

Expand Down Expand Up @@ -1460,10 +1469,11 @@ void CHIPQueueLevel0::finish() {
if (LastEvent)
LastEvent->wait();

zeStatus = zeCommandQueueSynchronize(ZeCmdQ_, ChipEnvVars.getL0EventTimeout());
zeStatus =
zeCommandQueueSynchronize(ZeCmdQ_, ChipEnvVars.getL0EventTimeout());
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeCommandQueueSynchronize,
"zeCommandQueueSynchronize timeout out");

this->LastEvent_ = nullptr;
return;
}

Expand Down Expand Up @@ -1702,8 +1712,8 @@ void CHIPBackendLevel0::initializeImpl() {
0};

ze_context_handle_t ZeCtx;
zeStatus = zeContextCreateEx(ZeDriver, &CtxDesc, DeviceCount, ZeDevices.data(),
&ZeCtx);
zeStatus = zeContextCreateEx(ZeDriver, &CtxDesc, DeviceCount,
ZeDevices.data(), &ZeCtx);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeContextCreateEx);
CHIPContextLevel0 *ChipL0Ctx = new CHIPContextLevel0(ZeDriver, ZeCtx);
::Backend->addContext(ChipL0Ctx);
Expand Down Expand Up @@ -1848,8 +1858,8 @@ void *CHIPContextLevel0::allocateImpl(size_t Size, size_t Alignment,
// *)getDevices()[0])->get();
ze_device_handle_t ZeDev = nullptr; // Do not associate allocation

zeStatus = zeMemAllocShared(ZeCtx, &DmaDesc, &HmaDesc, Size, Alignment, ZeDev,
&Ptr);
zeStatus = zeMemAllocShared(ZeCtx, &DmaDesc, &HmaDesc, Size, Alignment,
ZeDev, &Ptr);
CHIPERR_CHECK_LOG_AND_THROW_TABLE(zeMemAllocShared);
} else if (MemTy == hipMemoryType::hipMemoryTypeDevice) {
auto ChipDev = (CHIPDeviceLevel0 *)Backend->getActiveDevice();
Expand Down Expand Up @@ -2534,15 +2544,16 @@ void CHIPExecItemLevel0::setupAllArgs() {
if (zeStatus != ZE_RESULT_SUCCESS) {
logWarn("zeKernelSetArgumentValue returned error, "
"setting the ptr arg to nullptr");
zeStatus = zeKernelSetArgumentValue(Kernel->get(), Arg.Index, 0, nullptr);
zeStatus =
zeKernelSetArgumentValue(Kernel->get(), Arg.Index, 0, nullptr);
}
break;
}
case SPVTypeKind::PODByRef: {
auto *SpillSlot = ArgSpillBuffer_->allocate(Arg);
assert(SpillSlot);
zeStatus = zeKernelSetArgumentValue(Kernel->get(), Arg.Index,
sizeof(void *), &SpillSlot);
sizeof(void *), &SpillSlot);
break;
}
default:
Expand Down
1 change: 1 addition & 0 deletions src/backend/OpenCL/CHIPBackendOpenCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,7 @@ void CHIPQueueOpenCL::finish() {
#endif
clStatus = get()->finish();
CHIPERR_CHECK_LOG_AND_THROW_TABLE(clFinish);
this->LastEvent_ = nullptr;
}

std::shared_ptr<chipstar::Event>
Expand Down
2 changes: 1 addition & 1 deletion tests/known_failures.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ ANY:
cuda-simpleCallback: ''
deviceMallocCompile: ''
hipMultiThreadAddCallback: ''
hipStreamSemantics: ''
syncthreadsExitedThreads: 'Exited threads calling __syncthreads'
LEVEL0_GPU:
hipMemset_Unit_hipMemsetAsync_SetMemoryWithOffset_Helgrind: 'False positives from L0 helper thread'
Expand Down Expand Up @@ -485,6 +484,7 @@ salami:
LEVEL0_GPU:
OPENCL_CPU:
OPENCL_GPU:
hipStreamSemantics: 'TestStreamSemantics_2 (non-blocking stream): Failed, then timeout'
Unit_hipGraphAddEventRecordNode_Functional_ElapsedTime: 'Timeout'
Unit_hipGraphEventRecordNodeSetEvent_SetEventProperty: 'Timeout'
Unit_hipStreamAddCallback_ParamTst_Positive: 'Timeout'
Expand Down
Loading