Skip to content

Commit

Permalink
Lock OperQ before checking content (#4446)
Browse files Browse the repository at this point in the history
* lock to check queue

* Some refactoring and other fixes

---------

Co-authored-by: Nick Banks <nibanks@microsoft.com>
  • Loading branch information
ami-GS and nibanks authored Aug 8, 2024
1 parent a6f38aa commit b01cef9
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 18 deletions.
10 changes: 8 additions & 2 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7559,7 +7559,8 @@ QuicConnProcessExpiredTimer(
_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
QuicConnDrainOperations(
_In_ QUIC_CONNECTION* Connection
_In_ QUIC_CONNECTION* Connection,
_Inout_ BOOLEAN* StillHasPriorityWork
)
{
QUIC_OPERATION* Oper;
Expand Down Expand Up @@ -7719,5 +7720,10 @@ QuicConnDrainOperations(

QuicConnValidate(Connection);

return HasMoreWorkToDo;
if (HasMoreWorkToDo) {
*StillHasPriorityWork = QuicOperationHasPriority(&Connection->OperQ);
return TRUE;
}

return FALSE;
}
3 changes: 2 additions & 1 deletion src/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,8 @@ QuicConnIndicateEvent(
_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
QuicConnDrainOperations(
_In_ QUIC_CONNECTION* Connection
_In_ QUIC_CONNECTION* Connection,
_Inout_ BOOLEAN* StillHasPriorityWork
);

//
Expand Down
6 changes: 6 additions & 0 deletions src/core/inline.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,3 +816,9 @@ QuicMtuDiscoveryCheckSearchCompleteTimeout(
_In_ QUIC_CONNECTION* Connection,
_In_ uint64_t TimeNow
);

_IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN
QuicOperationHasPriority(
_In_ QUIC_OPERATION_QUEUE* OperQ
);
16 changes: 16 additions & 0 deletions src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,22 @@ QuicOperationFree(
_In_ QUIC_OPERATION* Oper
);

//
// Returns TRUE if the operation queue has priority operations queued.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
inline
BOOLEAN
QuicOperationHasPriority(
_In_ QUIC_OPERATION_QUEUE* OperQ
)
{
CxPlatDispatchLockAcquire(&OperQ->Lock);
BOOLEAN HasPriorityWork = (&OperQ->List.Flink != OperQ->PriorityTail);
CxPlatDispatchLockRelease(&OperQ->Lock);
return HasPriorityWork;
}

//
// Enqueues an operation. Returns TRUE if the queue was previously empty and not
// already being processed.
Expand Down
35 changes: 20 additions & 15 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,25 +301,30 @@ _IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicWorkerMoveConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
_In_ QUIC_CONNECTION* Connection,
_In_ BOOLEAN IsPriority
)
{
CXPLAT_DBG_ASSERT(Connection->Worker != NULL);
CXPLAT_DBG_ASSERT(Connection->HasQueuedWork);

CxPlatDispatchLockAcquire(&Worker->Lock);

BOOLEAN WakeWorkerThread = QuicWorkerIsIdle(Worker);

if (Connection->HasQueuedWork) {
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
QuicTraceEvent(
ConnScheduleState,
"[conn][%p] Scheduling: %u",
Connection,
QUIC_SCHEDULE_QUEUED);
QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER);
const BOOLEAN WakeWorkerThread = QuicWorkerIsIdle(Worker);
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
if (IsPriority) {
CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink);
Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink;
Connection->HasPriorityWork = TRUE;
} else {
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
}
QuicTraceEvent(
ConnScheduleState,
"[conn][%p] Scheduling: %u",
Connection,
QUIC_SCHEDULE_QUEUED);
QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER);

CxPlatDispatchLockRelease(&Worker->Lock);

Expand Down Expand Up @@ -549,8 +554,9 @@ QuicWorkerProcessConnection(
//
// Process some operations.
//
BOOLEAN StillHasPriorityWork = FALSE;
BOOLEAN StillHasWorkToDo =
QuicConnDrainOperations(Connection) | Connection->State.UpdateWorker;
QuicConnDrainOperations(Connection, &StillHasPriorityWork) | Connection->State.UpdateWorker;
Connection->WorkerThreadID = 0;

//
Expand All @@ -564,8 +570,7 @@ QuicWorkerProcessConnection(
if (!Connection->State.UpdateWorker) {
if (Connection->HasQueuedWork) {
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
if (&Connection->OperQ.List.Flink != Connection->OperQ.PriorityTail) {
// priority operations are still pending
if (StillHasPriorityWork) {
CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink);
Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink;
Connection->HasPriorityWork = TRUE;
Expand Down Expand Up @@ -602,7 +607,7 @@ QuicWorkerProcessConnection(
CXPLAT_FRE_ASSERT(Connection->Registration != NULL);
QuicRegistrationQueueNewConnection(Connection->Registration, Connection);
CXPLAT_DBG_ASSERT(Worker != Connection->Worker);
QuicWorkerMoveConnection(Connection->Worker, Connection);
QuicWorkerMoveConnection(Connection->Worker, Connection, StillHasPriorityWork);
}

//
Expand Down

0 comments on commit b01cef9

Please sign in to comment.