다음을 통해 공유


고성능 HTTP 서버 코드 예제

이 항목의 코드 예제는 여러 스레드에서 HTTP Server API를 비동기적으로 사용하여 서버 쪽 작업을 수행할 때 고성능 및 처리량을 달성하는 방법을 보여 주는 앱으로 빌드됩니다.

#ifndef UNICODE
#define UNICODE
#endif

#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif

#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif

#include <windows.h>
#include <winternl.h>
#include <stdio.h>
#include <conio.h>
#include <http.h>

#pragma comment(lib, "httpapi.lib")
#pragma comment(lib, "ntdll.lib")

//
// Macros
//

#define ALLOC_MEM(cb) HeapAlloc(GetProcessHeap(), 0, (cb))
#define FREE_MEM(ptr) HeapFree(GetProcessHeap(), 0, (ptr))

#define STRLEN_LIT(x) ((sizeof(x) / sizeof(x[0])) - 1)

#define LISTENER_THREAD_COUNT            MAXIMUM_WAIT_OBJECTS
#define MAX_COMPLETIONS_PER_DEQUEUE                         4
#define MAX_INLINE_COMPLETION_SPIN_TIME                   500

//
// Global definitions
//

BOOL g_StopListeners = FALSE;
HANDLE g_RequestQueue = NULL;
DWORD g_InitialReceivesPerThread = 4;
HANDLE g_CompletionPort = NULL;
ULONG g_ResponseSize = 512;
PBYTE g_ResponseData = NULL;
DWORD g_RequestReceived = 0;
DWORD g_ResponseSent = 0;

#define REQUEST_BUFFER_SIZE   4096

typedef enum _IO_CONTEXT_STATE
{
    IoStateIdle,
    IoStateReceive,
    IoStateSend,
} IO_CONTEXT_STATE, * PIO_CONTEXT_STATE;

typedef struct _IO_CONTEXT
{
    IO_CONTEXT_STATE State;
    OVERLAPPED Overlapped;
    LIST_ENTRY ListEntry;
    ULONG IoResult;
    HTTP_RESPONSE Response;
    HTTP_DATA_CHUNK DataChunk;
    PHTTP_REQUEST Request;
    ULONG RequestBufferSize;
    DWORD NumberOfBytesTransferred;
} IO_CONTEXT, * PIO_CONTEXT;

typedef struct _WORKER_STATS
{
    ULONGLONG InlineReceives;
    ULONGLONG AsyncReceives;
    ULONGLONG InlineSends;
    ULONGLONG AsyncSends;
    ULONGLONG RequestsReceived;
} WORKER_STATS, * PWORKER_STATS;

typedef struct _WORKER_BLOCK
{
    LIST_ENTRY CompletionList;
    WORKER_STATS Stats;
} WORKER_BLOCK, * PWORKER_BLOCK;

//
// Prototypes.
//

DWORD
WINAPI
HttpListenerThread(
    __in VOID* pThreadParams
);

DWORD
WINAPI
DisplayStatsThread(
    __in VOID* pThreadParams
);

ULONG
InitiateReceive(
    __inout PWORKER_BLOCK Worker,
    __in HTTP_REQUEST_ID RequestId,
    __in ULONG RequestBufferSize
);

ULONG
ProcessReceiveCompletion(
    __inout PIO_CONTEXT Context,
    __inout PWORKER_BLOCK Worker
);

ULONG
SendResponse(
    __inout PIO_CONTEXT Context,
    __inout PWORKER_BLOCK Worker
);

VOID
ProcessSendCompletion(
    __inout PIO_CONTEXT Context,
    __inout PWORKER_BLOCK Worker
);

ULONG
WaitForCompletions(
    __inout PWORKER_BLOCK Worker,
    __in ULONG Timeout
);

PIO_CONTEXT
AllocateIoContext();

VOID
FreeIoContext(
    __in PIO_CONTEXT Context
);

//
// Utility functions
//

VOID
InsertTailList(
    __inout PLIST_ENTRY ListHead,
    __inout PLIST_ENTRY Entry
);

BOOLEAN
IsListEmpty(
    __in const LIST_ENTRY* ListHead
);

VOID
InitializeListHead(
    __out PLIST_ENTRY ListHead
);

PLIST_ENTRY
RemoveHeadList(
    __inout PLIST_ENTRY ListHead
);

int __cdecl wmain(
    int argc,
    wchar_t* argv[]
)
{
    DWORD                  Result = ERROR_SUCCESS;
    DWORD                  WaitResult = 0;
    HTTPAPI_VERSION        HttpApiVersion = HTTPAPI_VERSION_2;
    DWORD                  Index = 0;
    ULONG                  ThreadCount = LISTENER_THREAD_COUNT;
    HANDLE                 ThreadHandles[LISTENER_THREAD_COUNT] = {};
    HTTP_URL_GROUP_ID      UrlGroupId = HTTP_NULL_ID;
    HTTP_SERVER_SESSION_ID SessionId = HTTP_NULL_ID;
    HTTP_BINDING_INFO      BindingInfo = {};

    if (argc < 2)
    {
        wprintf(L"%ws: <Url1> [Url2] ... \n", argv[0]);
        return -1;
    }

    //
    // Initialize HTTP Server APIs.
    //

    Result = HttpInitialize(
        HttpApiVersion,
        HTTP_INITIALIZE_SERVER,
        NULL);
    if (Result != ERROR_SUCCESS)
    {
        wprintf(L"HttpInitialize failed with %d\n", Result);
        return Result;
    }

    //
    // Create a request queue.
    //

    Result = HttpCreateRequestQueue(
        HttpApiVersion,
        NULL,
        NULL,
        0,
        &g_RequestQueue);
    if (Result != ERROR_SUCCESS)
    {
        wprintf(L"HttpCreateRequestQueue failed %d\n", Result);
        goto Exit;
    }

    //
    // Create the HTTP Server session.
    //

    Result = HttpCreateServerSession(HttpApiVersion, &SessionId, 0);
    if (Result != ERROR_SUCCESS)
    {
        wprintf(L"HttpCreateServerSession failed %d\n", Result);
        goto Exit;
    }

    //
    // Create a URL group.
    //

    Result = HttpCreateUrlGroup(SessionId, &UrlGroupId, 0);
    if (Result != ERROR_SUCCESS)
    {
        wprintf(L"HttpCreateUrlGroup failed %d\n", Result);
        goto Exit;
    }

    //
    // Add our URLs to the URL group.
    //
    // The URI is a fully-qualified URI, and must include the
    // terminating (/) character.
    //

    for (Index = 1; Index < (DWORD) argc; Index++)
    {
        wprintf(L"Listening for requests on the following url: %s\n", argv[Index]);

        Result = HttpAddUrlToUrlGroup(UrlGroupId, argv[Index], 0, 0);
        if (Result != ERROR_SUCCESS)
        {
            wprintf(L"HttpAddUrlToUrlGroup for %s failed %d\n", argv[Index], Result);
            goto Exit;
        }
    }

    //
    // Bind the URL group to the request queue.
    //

    ZeroMemory(&BindingInfo, sizeof(BindingInfo));
    BindingInfo.Flags.Present = 1;
    BindingInfo.RequestQueueHandle = g_RequestQueue;

    Result = HttpSetUrlGroupProperty(
        UrlGroupId,
        HttpServerBindingProperty,
        &BindingInfo,
        sizeof(BindingInfo));
    if (Result != ERROR_SUCCESS)
    {
        wprintf(L"HttpSetUrlGroupProperty failed %d\n", Result);
        goto Exit;
    }

    //
    // Initialize the response buffer.
    //

    g_ResponseData = (PBYTE) ALLOC_MEM(g_ResponseSize + 1);
    if (g_ResponseData == NULL)
    {
        printf("out of memory\n");
        Result = ERROR_OUTOFMEMORY;
        goto Exit;
    }

    for (Index = 0; Index < g_ResponseSize; Index++)
    {
        g_ResponseData[Index] = (Index % 10) + '0';
    }
    g_ResponseData[Index] = '\0';

    //
    // Create completion port to receive IO completions.
    //

    //
    // We don't want to potentially switch the threads thru
    // completion ports when processing the inline completions.
    //

    if (!SetFileCompletionNotificationModes(
            g_RequestQueue,
            FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE))
    {
        Result = GetLastError();
        printf("SetFileCompletionNotificationModes failed %d\n", Result);
        goto Exit;
    }

    g_CompletionPort = CreateIoCompletionPort(
        g_RequestQueue,
        NULL,
        0,
        ThreadCount);

    if (g_CompletionPort == NULL)
    {
        Result = GetLastError();
        printf("CreateIoCompletionPort failed %d\n", Result);
        goto Exit;
    }

    //
    // Launch stats display thread.
    //

    ThreadHandles[0] = CreateThread(
        NULL,
        0,
        DisplayStatsThread,
        NULL,
        0,
        NULL);

    if (ThreadHandles[0] == NULL)
    {
        Result = GetLastError();
        printf("CreateThread failed %d\n", Result);
        goto Exit;
    }

    //
    // Launch request/response I/O threads.
    //

    for (Index = 1; Index < ThreadCount; Index++)
    {
        ThreadHandles[Index] = CreateThread(
            NULL,
            0,
            HttpListenerThread,
            (PVOID)(SIZE_T)Index,
            0,
            NULL);

        if (ThreadHandles[Index] == NULL)
        {
            Result = GetLastError();
            printf("CreateThread failed %d\n", Result);
            goto Exit;
        }
    }

    //
    // Wait for user command to exit.
    //

    _getch();

    //
    // Signal I/O threads to stop processing the requests, and exit.
    //

    g_StopListeners = TRUE;

    //
    // Stop queuing requests, and cancel all outstanding requests.
    //

    Result = HttpShutdownRequestQueue(g_RequestQueue);
    if (Result != ERROR_SUCCESS)
    {
        wprintf(L"HttpShutdownRequestQueue failed with %lu \n", Result);
        goto Exit;
    }

    //
    // Wait for all I/O threads to exit.
    //

    WaitResult = WaitForMultipleObjects(LISTENER_THREAD_COUNT, ThreadHandles, TRUE, 15000);
    if (WaitResult == WAIT_FAILED)
    {
        Result = GetLastError();
        printf("WaitForMultipleObjects failed %d\n", Result);
        goto Exit;
    }

    if (WaitResult == WAIT_TIMEOUT)
    {
        printf("WaitForMultipleObjects timed out\n");
        goto Exit;
    }

Exit:

    //
    // Close the request queue handle.
    //

    if (g_RequestQueue)
    {
        HttpCloseRequestQueue(g_RequestQueue);
        g_RequestQueue = NULL;
    }

    //
    // Close the URL group.
    //

    if (UrlGroupId != HTTP_NULL_ID)
    {
        HttpCloseUrlGroup(UrlGroupId);
        UrlGroupId = HTTP_NULL_ID;
    }

    if (SessionId != HTTP_NULL_ID)
    {
        HttpCloseServerSession(SessionId);
        SessionId = HTTP_NULL_ID;
    }

    //
    // Call HttpTerminate.
    //

    HttpTerminate(HTTP_INITIALIZE_SERVER, NULL);

    //
    // Close the completion port handle.
    //

    if (g_CompletionPort)
    {
        CloseHandle(g_CompletionPort);
        g_CompletionPort = NULL;
    }

    //
    // Close the I/O threads' handles.
    //

    for (Index = 0; Index < ThreadCount; Index++)
    {
        if (ThreadHandles[Index] != NULL)
        {
            CloseHandle(ThreadHandles[Index]);
            ThreadHandles[Index] = NULL;
        }
    }

    return Result;
}

//
// Show some statistics.
//

DWORD
WINAPI
DisplayStatsThread(
    __in VOID* /*pThreadParams*/
)
{
    printf("Press any key to exit ...\r\n");

    printf("Requests Received,   Responses Sent\r\n");
    printf("%17u,%17u\r\n", g_RequestReceived, g_ResponseSent);

    while (!g_StopListeners)
    {
        Sleep(3000);
        printf("%17u,%17u\r\n", g_RequestReceived, g_ResponseSent);
        fflush(stdout);
    }

    return 0;
}


DWORD
WINAPI
HttpListenerThread(
    __in VOID* /*pThreadParams*/
)
{
    ULONG Result = ERROR_SUCCESS;
    ULONG Index = 0;
    WORKER_BLOCK Worker = { 0 };
    PIO_CONTEXT Context = NULL;
    ULONG TickCount = 0;
    ULONG CompletionCheckTimestamp = 0;

    InitializeListHead(&Worker.CompletionList);

    //
    // Posting initial receives.
    //

    for (Index = 0; Index < g_InitialReceivesPerThread; Index++)
    {
        Result = InitiateReceive(
            &Worker,
            HTTP_NULL_ID,
            REQUEST_BUFFER_SIZE);
        if (Result != ERROR_SUCCESS)
        {
            printf("InitiateReceive faield with %d\n", Result);
            goto Exit;
        }
    }

    while (!g_StopListeners)
    {
        CompletionCheckTimestamp = GetTickCount();

        //
        // Process the completions.
        //

        while (!IsListEmpty(&Worker.CompletionList))
        {
            if (g_StopListeners)
            {
                goto Exit;
            }

            Context = CONTAINING_RECORD(
                RemoveHeadList(&Worker.CompletionList),
                IO_CONTEXT,
                ListEntry);

            switch (Context->State)
            {
                case IoStateReceive:
                    ProcessReceiveCompletion(Context, &Worker);
                    break;

                case IoStateSend:
                    ProcessSendCompletion(Context, &Worker);
                    break;

                default:
                    printf("Invalid IoContext state %d\n", Context->State);
                    goto Exit;
            }

            //
            // To prevent inline completions from starving the
            // completion port, we check the completion port regularly.
            //

            TickCount = GetTickCount();
            if ((LONG)(TickCount - CompletionCheckTimestamp) > MAX_INLINE_COMPLETION_SPIN_TIME)
            {
                CompletionCheckTimestamp = TickCount;
                WaitForCompletions(&Worker, 0);
            }
        }

        //
        // Wait for completions.
        //

        Result = WaitForCompletions(&Worker, 5000);
        if (Result != ERROR_SUCCESS)
        {
            printf("WaitForCompletions faield with %d\n", Result);
            goto Exit;
        }
    }

Exit:

    //
    // Clean up any unprocessed I/O context.
    //

    while (!IsListEmpty(&Worker.CompletionList))
    {
        Context = CONTAINING_RECORD(
            RemoveHeadList(&Worker.CompletionList),
            IO_CONTEXT,
            ListEntry);

        FreeIoContext(Context);
    }

    return 0;
}


PIO_CONTEXT
AllocateIoContext(
    ULONG RequestBufferSize
)
{
    PSLIST_ENTRY Entry = NULL;
    PIO_CONTEXT Context = NULL;
    ULONG ContextSize = 0;

    //
    // The request buffer size should be at least big enough for an HTTP_REQUEST structure.
    //

    if (sizeof(HTTP_REQUEST) > RequestBufferSize)
    {
        RequestBufferSize = sizeof(HTTP_REQUEST);
    }

    ContextSize = sizeof(IO_CONTEXT) + RequestBufferSize;

    Context = (PIO_CONTEXT)ALLOC_MEM(ContextSize);
    if (Context == NULL)
    {
        return NULL;
    }

    RtlZeroMemory(Context, ContextSize);

    Context->State = IoStateIdle;
    Context->Request = (PHTTP_REQUEST)((PCHAR)Context + sizeof(IO_CONTEXT));
    Context->RequestBufferSize = RequestBufferSize;

    return Context;
}


VOID
FreeIoContext(
    __in PIO_CONTEXT Context
)
{
    FREE_MEM(Context);
}


ULONG
InitiateReceive(
    __inout PWORKER_BLOCK Worker,
    __in HTTP_REQUEST_ID RequestId,
    __in ULONG RequestBufferSize
)
{
    ULONG Result = ERROR_SUCCESS;
    PIO_CONTEXT Context = NULL;
    ULONG BytesTransferred = 0;

    for (;;)
    {
        Context = AllocateIoContext(RequestBufferSize);
        if (Context == NULL)
        {
            printf("out of memory\n");
            Result = ERROR_OUTOFMEMORY;
            goto Exit;
        }

        Context->State = IoStateReceive;

        Result = HttpReceiveHttpRequest(
            g_RequestQueue,
            RequestId,
            0,
            Context->Request,
            Context->RequestBufferSize,
            &BytesTransferred,
            &Context->Overlapped);

        //
        // Check return status.
        //

        switch (Result)
        {
            case ERROR_MORE_DATA:
            case ERROR_SUCCESS:

                //
                // The call might fail with ERROR_MORE_DATA if the buffer wasn't large enough for the request.
                // The buffer size required to read the remaining part of the request is returned in the BytesTransferred.
                //

                Worker->Stats.InlineReceives += 1;

                Context->NumberOfBytesTransferred = BytesTransferred;

                //
                // Completed inline, put the context to the processing queue.
                //

                Context->IoResult = Result;

                InsertTailList(&Worker->CompletionList, &Context->ListEntry);

                Result = ERROR_SUCCESS;

                goto Exit;

            case ERROR_IO_PENDING:
                Worker->Stats.AsyncReceives += 1;

                Result = ERROR_SUCCESS;

                goto Exit;

            case ERROR_CONNECTION_INVALID:

                //
                // The error means that the client closed the underlying connection.
                // In that case we issue a new HttpReceiveHttpRequest to keep requests flowing.
                //

                if (RequestId == HTTP_NULL_ID)
                {
                    printf("HttpReceiveHttpRequest returned an unexpected error for HTTP_NULL_ID %d\n", Result);
                    goto Exit;
                }

                //
                // Free the current I/O context.
                //

                FreeIoContext(Context);
                Context = NULL;

                RequestId = HTTP_NULL_ID;
                BytesTransferred = 0;

                //
                // Receive new request with the default buffer size.
                //

                RequestBufferSize = REQUEST_BUFFER_SIZE;

                break;

            default:

                //
                // Free async I/O context
                //

                FreeIoContext(Context);
                Context = NULL;

                //
                // Because of a race condition with cancelation routine, the call can fail with ERROR_INVALID_HANDLE.
                // That can be avoided if we synchronize HttpReceiveHttpRequest and HttpShutdownRequestQueue calls.
                //

                if (g_StopListeners &&
                    Result == ERROR_INVALID_HANDLE)
                {
                    Result = ERROR_SUCCESS;
                    goto Exit;
                }

                printf("HttpReceiveHttpRequest initiate failed %d\n", Result);
                goto Exit;
            }
    }

Exit:

    return Result;
}


ULONG
ProcessReceiveCompletion(
    __inout PIO_CONTEXT Context,
    __inout PWORKER_BLOCK Worker
)
{
    ULONG Result = ERROR_SUCCESS;
    HTTP_REQUEST_ID RequestId = HTTP_NULL_ID;
    ULONG RequestBufferSize = REQUEST_BUFFER_SIZE;

    Result = Context->IoResult;

    //
    // The call might fail with ERROR_MORE_DATA if the buffer wasn't
    // large enough for the request. The required buffer size is returned
    // in the OVERLAPPED_ENTRY::dwNumberOfBytesTransferred (stored in
    // Context->NumberOfBytesTransferred). In that case, HttpReceiveHttpRequest
    // should be called again with the same Context->Request->RequestId,
    // and with a big enough buffer.
    //

    if (Result == ERROR_MORE_DATA)
    {
        RequestId = Context->Request->RequestId;
        RequestBufferSize = Context->NumberOfBytesTransferred;

        FreeIoContext(Context);
        Context = NULL;

        goto InitReceive;
    }

    if (Result != ERROR_SUCCESS &&
        Result != ERROR_HANDLE_EOF)
    {
        printf("HttpReceiveHttpRequest completion failed %d\n", Result);
        goto Exit;
    }

    //
    // The code example is expecting only the GET verb. Cancel the request.
    //

    if (Context->Request->Verb != HttpVerbGET)
    {
        printf("Receved an unexpected http verb %d\n", Context->Request->Verb);

        Result = HttpCancelHttpRequest(
            g_RequestQueue,
            Context->Request->RequestId,
            NULL);
        if (Result != ERROR_SUCCESS)
        {
            printf("HttpCancelHttpRequest failed %d\n", Result);
            goto Exit;
        }

        FreeIoContext(Context);
        Context = NULL;

        //
        // Post the next receive
        //

        goto InitReceive;
    }

    //
    // The implementations must handle the scenario when the Context->Request->Flags
    // has HTTP_REQUEST_FLAG_MORE_ENTITY_BODY_EXISTS bit set. In that case, additional
    // calls to HttpReceiveRequestEntityBody or HttpReceiveHttpRequest should be made until
    // all data are received.
    //

    //
    // Update the receive statistics, and issue the send response.
    //

    Worker->Stats.RequestsReceived += 1;
    InterlockedIncrement(&g_RequestReceived);

    Result = SendResponse(
        Context,
        Worker);

    if (Result != ERROR_SUCCESS)
    {
        if (Result == ERROR_CONNECTION_INVALID)
        {
            //
            // The error means that the client closed the underlying connection.
            // In that case we issue a new HttpReceiveHttpRequest to keep requests flowing.
            //

            FreeIoContext(Context);
            Context = NULL;

            goto InitReceive;
        }

        printf("SendResponse failed %d\n", Result);
        goto Exit;
    }

    //
    // SendResponse's async I/O took the ownership of the context.
    //

    Context = NULL;

InitReceive:

    //
    // Post the next receive without waiting for SendResponse's
    // completion, as we want to keep the receive pip active.
    //

    Result = InitiateReceive(
        Worker,
        RequestId,
        RequestBufferSize);
    if (Result != ERROR_SUCCESS)
    {
        printf("InitiateReceive failed %d\n", Result);
        goto Exit;
    }

Exit:

    if (Context != NULL)
    {
        FreeIoContext(Context);
        Context = NULL;
    }

    return Result;
}


ULONG
SendResponse(
    __inout PIO_CONTEXT Context,
    __inout PWORKER_BLOCK Worker
)
{
    ULONG Result = ERROR_SUCCESS;

    //
    // Build response.
    //

    Context->Response.Headers.KnownHeaders[HttpHeaderContentType].pRawValue = "text/plain";
    Context->Response.Headers.KnownHeaders[HttpHeaderContentType].RawValueLength = STRLEN_LIT("text/plain");

    Context->Response.StatusCode = 200;
    Context->Response.pReason = "OK";
    Context->Response.ReasonLength = STRLEN_LIT("OK");

    Context->DataChunk.DataChunkType = HttpDataChunkFromMemory;
    Context->DataChunk.FromMemory.pBuffer = g_ResponseData;
    Context->DataChunk.FromMemory.BufferLength = g_ResponseSize;

    Context->Response.EntityChunkCount = 1;
    Context->Response.pEntityChunks = &Context->DataChunk;

    //
    // Send response.
    // Tiny responses might complete inline.
    //

    Context->State = IoStateSend;

    Result = HttpSendHttpResponse(
        g_RequestQueue,
        Context->Request->RequestId,
        0,
        &Context->Response,
        NULL,
        NULL,
        NULL,
        0,
        &Context->Overlapped,
        NULL);

    //
    // Update stats.
    //

    if (Result == ERROR_IO_PENDING)
    {
        Worker->Stats.AsyncSends += 1;
        Result = ERROR_SUCCESS;
    }
    else if (Result == ERROR_SUCCESS)
    {
        Worker->Stats.InlineSends += 1;

        //
        // Completed inline, put the context to the processing queue.
        //

        Context->IoResult = ERROR_SUCCESS;
        InsertTailList(&Worker->CompletionList, &Context->ListEntry);
    }
    else
    {
        printf("HttpSendHttpResponse failed in-line %d. Continuing...\n", Result);
    }

    return Result;
}


VOID
ProcessSendCompletion(
    __inout PIO_CONTEXT Context,
    __inout PWORKER_BLOCK Worker
)
{
    ULONG Result = Context->IoResult;

    if (Result != ERROR_SUCCESS)
    {
        printf("HttpSendHttpResponse failed %d. Continuing...\n", Result);
    }
    else
    {
        InterlockedIncrement(&g_ResponseSent);
    }

    Context->State = IoStateIdle;

    FreeIoContext(Context);
    Context = NULL;
}


ULONG
WaitForCompletions(
    __inout PWORKER_BLOCK Worker,
    __in ULONG Timeout
)
{
    ULONG Result = ERROR_SUCCESS;
    BOOL Success = FALSE;
    OVERLAPPED_ENTRY Entries[MAX_COMPLETIONS_PER_DEQUEUE] = {};
    ULONG EntryCount = 0;
    ULONG Index = 0;
    PIO_CONTEXT Context = NULL;
    ULONG MaxEntries = 0;
    ULONG IoStatus = ERROR_SUCCESS;

    MaxEntries = min(g_InitialReceivesPerThread, MAX_COMPLETIONS_PER_DEQUEUE);

    Success = GetQueuedCompletionStatusEx(
        g_CompletionPort,
        Entries,
        MaxEntries,
        &EntryCount,
        Timeout,
        FALSE);

    if (!Success)
    {
        Result = GetLastError();
        if (Result != WAIT_TIMEOUT)
        {
            printf("GetQueuedCompletionStatusEx failed %d\n", Result);
            goto Exit;
        }

        Result = ERROR_SUCCESS;

        goto Exit;
    }

    for (Index = 0; Index < EntryCount; Index++)
    {
        IoStatus = RtlNtStatusToDosError((NTSTATUS)Entries[Index].Internal);

        Context = CONTAINING_RECORD(
            Entries[Index].lpOverlapped,
            IO_CONTEXT,
            Overlapped);

        Context->IoResult = IoStatus;

        if (IoStatus == ERROR_SUCCESS ||
            IoStatus == ERROR_MORE_DATA)
        {
            Context->NumberOfBytesTransferred = Entries[Index].dwNumberOfBytesTransferred;
        }

        InsertTailList(&Worker->CompletionList, &Context->ListEntry);

        Context = NULL;
    }

Exit:

    return Result;
}

//
// Utility functions.
//

VOID
InsertTailList(
    __inout PLIST_ENTRY ListHead,
    __inout PLIST_ENTRY Entry
)
{
    PLIST_ENTRY Blink;

    Blink = ListHead->Blink;
    Entry->Flink = ListHead;
    Entry->Blink = Blink;
    Blink->Flink = Entry;
    ListHead->Blink = Entry;
    return;
}

BOOLEAN
IsListEmpty(
    __in const LIST_ENTRY* ListHead
)
{
    return (BOOLEAN)(ListHead->Flink == ListHead);
}

VOID
InitializeListHead(
    __out PLIST_ENTRY ListHead
)
{
    ListHead->Flink = ListHead->Blink = ListHead;
    return;
}

PLIST_ENTRY
RemoveHeadList(
    __inout PLIST_ENTRY ListHead
)
{
    PLIST_ENTRY Flink;
    PLIST_ENTRY Entry;

    Entry = ListHead->Flink;
    Flink = Entry->Flink;
    ListHead->Flink = Flink;
    Flink->Blink = ListHead;
    return Entry;
}