From b01cef94f89f8542cb22b994925513ccc6b360f8 Mon Sep 17 00:00:00 2001 From: Daiki AMINAKA <1991.daiki@gmail.com> Date: Thu, 8 Aug 2024 11:29:33 -0700 Subject: [PATCH] Lock OperQ before checking content (#4446) * lock to check queue * Some refactoring and other fixes --------- Co-authored-by: Nick Banks --- src/core/connection.c | 10 ++++++++-- src/core/connection.h | 3 ++- src/core/inline.c | 6 ++++++ src/core/operation.h | 16 ++++++++++++++++ src/core/worker.c | 35 ++++++++++++++++++++--------------- 5 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/core/connection.c b/src/core/connection.c index d31e953609..7630a5bc79 100644 --- a/src/core/connection.c +++ b/src/core/connection.c @@ -7559,7 +7559,8 @@ QuicConnProcessExpiredTimer( _IRQL_requires_max_(PASSIVE_LEVEL) BOOLEAN QuicConnDrainOperations( - _In_ QUIC_CONNECTION* Connection + _In_ QUIC_CONNECTION* Connection, + _Inout_ BOOLEAN* StillHasPriorityWork ) { QUIC_OPERATION* Oper; @@ -7719,5 +7720,10 @@ QuicConnDrainOperations( QuicConnValidate(Connection); - return HasMoreWorkToDo; + if (HasMoreWorkToDo) { + *StillHasPriorityWork = QuicOperationHasPriority(&Connection->OperQ); + return TRUE; + } + + return FALSE; } diff --git a/src/core/connection.h b/src/core/connection.h index 1c4c301aff..bfff21972a 100644 --- a/src/core/connection.h +++ b/src/core/connection.h @@ -1144,7 +1144,8 @@ QuicConnIndicateEvent( _IRQL_requires_max_(PASSIVE_LEVEL) BOOLEAN QuicConnDrainOperations( - _In_ QUIC_CONNECTION* Connection + _In_ QUIC_CONNECTION* Connection, + _Inout_ BOOLEAN* StillHasPriorityWork ); // diff --git a/src/core/inline.c b/src/core/inline.c index f73a92ca53..a128550ea7 100644 --- a/src/core/inline.c +++ b/src/core/inline.c @@ -816,3 +816,9 @@ QuicMtuDiscoveryCheckSearchCompleteTimeout( _In_ QUIC_CONNECTION* Connection, _In_ uint64_t TimeNow ); + +_IRQL_requires_max_(DISPATCH_LEVEL) +BOOLEAN +QuicOperationHasPriority( + _In_ QUIC_OPERATION_QUEUE* OperQ + ); diff --git a/src/core/operation.h b/src/core/operation.h index b58691791b..3146588a07 100644 --- a/src/core/operation.h +++ b/src/core/operation.h @@ -338,6 +338,22 @@ QuicOperationFree( _In_ QUIC_OPERATION* Oper ); +// +// Returns TRUE if the operation queue has priority operations queued. +// +_IRQL_requires_max_(DISPATCH_LEVEL) +inline +BOOLEAN +QuicOperationHasPriority( + _In_ QUIC_OPERATION_QUEUE* OperQ + ) +{ + CxPlatDispatchLockAcquire(&OperQ->Lock); + BOOLEAN HasPriorityWork = (&OperQ->List.Flink != OperQ->PriorityTail); + CxPlatDispatchLockRelease(&OperQ->Lock); + return HasPriorityWork; +} + // // Enqueues an operation. Returns TRUE if the queue was previously empty and not // already being processed. diff --git a/src/core/worker.c b/src/core/worker.c index e93d189a4e..1132642953 100644 --- a/src/core/worker.c +++ b/src/core/worker.c @@ -301,25 +301,30 @@ _IRQL_requires_max_(DISPATCH_LEVEL) void QuicWorkerMoveConnection( _In_ QUIC_WORKER* Worker, - _In_ QUIC_CONNECTION* Connection + _In_ QUIC_CONNECTION* Connection, + _In_ BOOLEAN IsPriority ) { CXPLAT_DBG_ASSERT(Connection->Worker != NULL); + CXPLAT_DBG_ASSERT(Connection->HasQueuedWork); CxPlatDispatchLockAcquire(&Worker->Lock); - BOOLEAN WakeWorkerThread = QuicWorkerIsIdle(Worker); - - if (Connection->HasQueuedWork) { - Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32(); - QuicTraceEvent( - ConnScheduleState, - "[conn][%p] Scheduling: %u", - Connection, - QUIC_SCHEDULE_QUEUED); - QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER); + const BOOLEAN WakeWorkerThread = QuicWorkerIsIdle(Worker); + Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32(); + if (IsPriority) { + CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink); + Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink; + Connection->HasPriorityWork = TRUE; + } else { CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink); } + QuicTraceEvent( + ConnScheduleState, + "[conn][%p] Scheduling: %u", + Connection, + QUIC_SCHEDULE_QUEUED); + QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER); CxPlatDispatchLockRelease(&Worker->Lock); @@ -549,8 +554,9 @@ QuicWorkerProcessConnection( // // Process some operations. // + BOOLEAN StillHasPriorityWork = FALSE; BOOLEAN StillHasWorkToDo = - QuicConnDrainOperations(Connection) | Connection->State.UpdateWorker; + QuicConnDrainOperations(Connection, &StillHasPriorityWork) | Connection->State.UpdateWorker; Connection->WorkerThreadID = 0; // @@ -564,8 +570,7 @@ QuicWorkerProcessConnection( if (!Connection->State.UpdateWorker) { if (Connection->HasQueuedWork) { Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32(); - if (&Connection->OperQ.List.Flink != Connection->OperQ.PriorityTail) { - // priority operations are still pending + if (StillHasPriorityWork) { CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink); Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink; Connection->HasPriorityWork = TRUE; @@ -602,7 +607,7 @@ QuicWorkerProcessConnection( CXPLAT_FRE_ASSERT(Connection->Registration != NULL); QuicRegistrationQueueNewConnection(Connection->Registration, Connection); CXPLAT_DBG_ASSERT(Worker != Connection->Worker); - QuicWorkerMoveConnection(Connection->Worker, Connection); + QuicWorkerMoveConnection(Connection->Worker, Connection, StillHasPriorityWork); } //