Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/platform/datapath_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -1021,10 +1021,20 @@ CxPlatSocketContextUninitialize(
//
epoll_ctl(*SocketContext->DatapathPartition->EventQ, EPOLL_CTL_DEL, SocketContext->SocketFd, NULL);

CXPLAT_FRE_ASSERT(
CxPlatEventQEnqueue(
if (!CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe));
&SocketContext->ShutdownSqe)) {
int Errno = errno;
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
Errno,
"CxPlatEventQEnqueue failed (Shutdown)");

// Queue can’t run the completion, so do it inline to finish teardown.
CxPlatSocketContextUninitializeEventComplete(&SocketContext->ShutdownSqe.Cqe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can simply run inline - in at least some of these files, I expect the orderly completion is relied on for serialization / mutual exclusion. Instead, we need to provide a backup mechanism that ensures orderly completion runs even if the underlying OS queue fails.

Copy link
Contributor Author

@Santhosha-bk Santhosha-bk Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtfriesen Rather than inline, we can try to re-add the event in queue with a delay in case of failure as a backup mechanism, we can add a 1ms delay and re-add the event like below. Please share your thoughts on this approach.

while (!CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe)) {
            if (++RetryCount >= MaxRetries) {
                QuicTraceLogWarning(
                    WorkerShutdownEnqueueRetry,
                    "[wrkr][%p] Shutdown enqueue failed, retry %u/%u",
                    Worker,
                    RetryCount,
                    MaxRetries);
                break;
            }
            // Exponential backoff: 1ms, 2ms, 4ms, 8ms, ...
            CxPlatSleep(1U << (RetryCount - 1));
 }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that re-trying with delay is an acceptable option, and it only pushes back the problem: what happens if it keeps failing, do we hang forever? Or are we back to a fre assert?

It might also be worth checking whether the issue reported in the bug can happen for Unix.
Typically, memory management won't let an allocation failure happen (a process will either get memory or be killed).

Copy link
Contributor Author

@Santhosha-bk Santhosha-bk Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we could have loop of 5-10 re-tries, but this is not an acceptable solution.

MsQuic stalls only in CxPlatWorkerPoolDestroyWorker because the thread waits for the worker thread to rejoin after a failed CxPlatEventQEnqueue. In all other cases where CxPlatEventQEnqueue is used, this behavior does not occur.

}
}
}

Expand Down Expand Up @@ -2183,10 +2193,20 @@ SocketSend(
CxPlatLockRelease(&SocketContext->TxQueueLock);
if (SendPending) {
if (FlushTxQueue) {
CXPLAT_FRE_ASSERT(
CxPlatEventQEnqueue(
if (!CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->FlushTxSqe));
&SocketContext->FlushTxSqe)) {
int Errno = errno;
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
Errno,
"CxPlatEventQEnqueue failed (FlushTx)");

// Run the completion inline to keep draining sends.
CxPlatSocketContextFlushTxEventComplete(&SocketContext->FlushTxSqe.Cqe);
}
}
return;
}
Expand Down
17 changes: 14 additions & 3 deletions src/platform/datapath_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -1012,9 +1012,20 @@ CxPlatSocketContextUninitialize(
&SocketContext->IoSqe,
EVFILT_READ,
EV_DELETE);
CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe);
if (!CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe)) {
int Errno = errno;
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
Errno,
"CxPlatEventQEnqueue failed (Shutdown)");

// Queue can’t run the completion, so run it inline.
CxPlatSocketContextUninitializeEventComplete(&SocketContext->ShutdownSqe.Cqe);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion src/platform/datapath_raw_xdp_linux.c
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,18 @@ CxPlatXdpExecute(
XdpPartitionShutdown,
"[ xdp][%p] XDP partition shutdown",
Partition);
CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe);
if (!CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe)) {
QuicTraceEvent(
LibraryErrorStatus,
"[xdp] ERROR, %u, %s.",
errno,
"CxPlatEventQEnqueue failed (Shutdown)");
//
// The event queue can’t deliver the shutdown SQE, so run the completion
// inline to drop the reference.
//
CxPlatPartitionShutdownEventComplete(&Partition->ShutdownSqe.Cqe);
}
return FALSE;
}

Expand Down
13 changes: 12 additions & 1 deletion src/platform/datapath_raw_xdp_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,18 @@ CxPlatXdpExecute(
Queue->TxXsk = NULL;
Queue = Queue->Next;
}
CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe);
if (!CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe)) {
QuicTraceEvent(
LibraryErrorStatus,
"[ xdp] ERROR, %u, %s.",
GetLastError(),
"CxPlatEventQEnqueue failed (Shutdown)");
//
// Manually drop the partition’s ref since the completion callback
// won’t fire without the SQE being queued.
//
CxPlatDpRawRelease((XDP_DATAPATH*)Partition->Xdp);
}
return FALSE;
}

Expand Down
104 changes: 89 additions & 15 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
BOOLEAN StoppingThread : 1;
BOOLEAN StoppedThread : 1;
BOOLEAN DestroyedThread : 1;
BOOLEAN EventQClosed : 1;
#if DEBUG // Debug flags - Must not be in the bitfield.
BOOLEAN ThreadStarted;
BOOLEAN ThreadFinished;
Expand Down Expand Up @@ -218,14 +219,47 @@ CxPlatWorkerPoolInitWorker(
return TRUE;
}

void
CxPlatWorkerCleanupEventQueue(
_In_ CXPLAT_WORKER* Worker
)
{
if (!Worker->InitializedEventQ) {
return;
}

if (Worker->InitializedUpdatePollSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe);
Worker->InitializedUpdatePollSqe = FALSE;
}
if (Worker->InitializedWakeSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe);
Worker->InitializedWakeSqe = FALSE;
}
if (Worker->InitializedShutdownSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe);
Worker->InitializedShutdownSqe = FALSE;
}

Worker->EventQClosed = TRUE;
CxPlatEventQCleanup(&Worker->EventQ);
Worker->InitializedEventQ = FALSE;
}

void
CxPlatWorkerPoolDestroyWorker(
_In_ CXPLAT_WORKER* Worker
)
{
if (Worker->InitializedThread) {
Worker->StoppingThread = TRUE;
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe);
if (!CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe)) {
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
"CxPlatEventQEnqueue(shutdown) Manually closing the event queue.");
CxPlatWorkerCleanupEventQueue(Worker);
}
CxPlatThreadWait(&Worker->Thread);
CxPlatThreadDelete(&Worker->Thread);
#if DEBUG
Expand All @@ -236,18 +270,9 @@ CxPlatWorkerPoolDestroyWorker(
} else {
// TODO - Handle synchronized cleanup for external event queues?
}
if (Worker->InitializedUpdatePollSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe);
}
if (Worker->InitializedWakeSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe);
}
if (Worker->InitializedShutdownSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe);
}
if (Worker->InitializedEventQ) {
CxPlatEventQCleanup(&Worker->EventQ);
}

CxPlatWorkerCleanupEventQueue(Worker);

if (Worker->InitializedECLock) {
CxPlatLockUninitialize(&Worker->ECLock);
}
Expand Down Expand Up @@ -498,7 +523,19 @@ CxPlatWorkerPoolAddExecutionContext(
CxPlatLockRelease(&Worker->ECLock);

if (QueueEvent) {
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe);
if (!CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe)) {
QuicTraceEvent(
LibraryErrorStatus,
"[ lib] ERROR, %u, %s.",
GetLastError(),
"CxPlatEventQEnqueue failed (updatepoll)");

//
// The event queue isn’t able to deliver the SQE, so execute the
// completion inline to move the pending contexts into the active list.
//
CxPlatUpdateExecutionContexts(Worker);
}
}
}

Expand All @@ -509,7 +546,19 @@ CxPlatWakeExecutionContext(
{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Context->CxPlatContext;
if (!InterlockedFetchAndSetBoolean(&Worker->Running)) {
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe);
if (!CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe)) {
QuicTraceEvent(
LibraryErrorStatus,
"[ lib] ERROR, %u, %s.",
GetLastError(),
"CxPlatEventQEnqueue failed (wake)");

//
// Failed to wake the worker, so clear the flag so a future attempt
// can retry once the caller observes the failure.
//
InterlockedFetchAndClearBoolean(&Worker->Running);
}
}
}

Expand Down Expand Up @@ -691,6 +740,27 @@ CxPlatProcessEvents(
Cqes,
ARRAYSIZE(Cqes),
Worker->State.WaitTime);

if (CqeCount == 0) {
if (Worker->EventQClosed) {
Worker->StoppedThread = TRUE;
return;
}
}
#if _WIN32
DWORD Err = GetLastError();
if (Err == ERROR_ABANDONED_WAIT_0 || Err == ERROR_INVALID_HANDLE) {
Worker->EventQClosed = TRUE;
Worker->StoppedThread = TRUE;
return;
}
#elif defined(CX_PLATFORM_LINUX) || defined(CX_PLATFORM_DARWIN)
if (errno == EBADF || errno == EINVAL) {
Worker->EventQClosed = TRUE;
Worker->StoppedThread = TRUE;
return;
}
#endif
uint32_t CurrentCqeCount = CqeCount;
CXPLAT_CQE* CurrentCqe = Cqes;

Expand Down Expand Up @@ -764,6 +834,10 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)

CxPlatProcessEvents(Worker);

if (Worker->EventQClosed && Worker->StoppedThread) {
break;
}

if (Worker->State.NoWorkCount == 0) {
Worker->State.LastWorkTime = Worker->State.TimeNow;
} else if (Worker->State.NoWorkCount > CXPLAT_WORKER_IDLE_WORK_THRESHOLD_COUNT) {
Expand Down
Loading