Issue with IO Completion port not returning buffer from WSARecv

2021-06-15T19:44:43.833+00:00

Having a issue with IO completion ports:

So getting a receive message on the IO completion port i set up, It triggers GetQueuedCompletionStatus and returns with the Completion Key and the Overlapped data. Both appear to be good and I can see data populated in their structures. However the Buffer which was passed to WSARecv was not populated with the incoming message. (the BytesTransfered indicates that there were bytes received, but no data in the WSABUF).

Here is the code as it currently stands, looking for help as to why the Buffer isn't being populated.

networkhandlerthread.ccp

#include "NetworkHandlerThread.h"

// Worker thread, processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF* DataBuf;
    DWORD RecvBytes = 0;
    Type1MessageParser Type1MsgParser;
    Type2MessageParser Type2MsgParser;
    int DestinationAddress = 0;
    bool IsType1 = false;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&PerIoData, INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() failed with error %d\n", Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() failed with error %d\n", WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }


        //We have a message, determine if it's something we receaved or something we should send.
        if (PerIoData->OperationType == OPERATION_TYPE_RECV)
        {
            ///tbd process recv
            ConnectedSocketData; //this is comming in good and has data
            PerIoData->Buffer; // this is empty (pointer is good, but no data)
        }
        else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
        {
            ///tbd process send
        }
    }
};


//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,
            &(pSocketData->hAcceptEvent),
            false,
            100,
            false);

        //Nothing happened, back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event, find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
            pSocketData->hAcceptEvent,
            &NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {

                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = WSAAccept(pSocketData->Socket,
                    (LPSOCKADDR)&NewSockAddr,
                    &nLen, NULL, NULL);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n", WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection, No Delay
                //const char chOpt = 1;
                //int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
                //if (nErr == -1)
                //{
                //    wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
                //    break;
                //}


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;

                //Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }

                //Set the PerIOData, will be used at completion time
                LPPER_IO_OPERATION_DATA PerIoData;
                PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));

                ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
                PerIoData->BufferLen = 0;
                PerIoData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                PerIoData->Buffer.buf = PerIoData->cBuffer;
                PerIoData->Buffer.len = DATA_BUFSIZE;


                //Kick off the first Recv request for the Socket, will be handled by the completion Queue.
                if (WSARecv(NewSocket, &(PerIoData->Buffer), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n", WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
                break;
            }
        }
    }
}


NetworkHandlerThread::NetworkHandlerThread()
{
    m_CompletionPort = 0;
    m_hListenThread = 0;
}

NetworkHandlerThread::~NetworkHandlerThread()
{

}

void NetworkHandlerThread::StartNetworkHandler()
{
    int iResult = 0;
    SYSTEM_INFO SystemInfo;
    unsigned int i = 0;

    //Start WSA
    iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iResult != NO_ERROR) {
        wprintf(L"WSAStartup() failed with error: %d\n", iResult);
        return;
    }

    //Start Completion Port
    m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (m_CompletionPort != NULL)
    {
        wprintf(L"Completion Port Created\n");
    }

    //Get # of system processors
    GetSystemInfo(&SystemInfo);

    //create Worker Threads for each processor.
    for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
    {
        HANDLE ThreadHandle;

        // Create a server worker thread, and pass the
        // completion port to the thread. 
        ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, m_CompletionPort, 0, NULL);

        // Close the thread handle
        if (ThreadHandle != NULL)
        {
            CloseHandle(ThreadHandle);
        }
    }
}

void NetworkHandlerThread::AddListenThread(int Port,
    ConfigHandler* pConfigHandle,
    void* ForwardHandle)
{
    SOCKADDR_IN InternetAddr;
    int iResult = 0;
    LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;

    if (pListenSocketData == NULL)
    {
        return;
    }

    //Create the listener Socket
    pListenSocketData->Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (pListenSocketData->Socket == INVALID_SOCKET)
    {
        wprintf(L"socket function failed with error: %ld\n", WSAGetLastError());
        WSACleanup();
        return;
    }

    // Create a Event to handle Socket Accepts
    pListenSocketData->hAcceptEvent = WSACreateEvent();
    if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
    {
        wprintf(L"WSACreateEvent() error %ld\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    // Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
    int nRet = WSAEventSelect(pListenSocketData->Socket,
        pListenSocketData->hAcceptEvent,
        FD_ACCEPT);
    if (nRet == SOCKET_ERROR)
    {
        wprintf(L"WSAAsyncSelect() error %ld\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    //Assign the Port Number
    InternetAddr.sin_family = AF_INET;
    InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    InternetAddr.sin_port = htons(Port);
    pListenSocketData->Port = Port;
    pListenSocketData->IOCP = m_CompletionPort;
    pListenSocketData->CfgHandle = pConfigHandle;
    pListenSocketData->ForwardMessager = ForwardHandle;

    //Bind the Socket to the Port
    iResult = ::bind((pListenSocketData->Socket), (sockaddr*)&InternetAddr, sizeof(InternetAddr));
    if (iResult == SOCKET_ERROR) {
        wprintf(L"bind function failed with error %d\n", WSAGetLastError());
        iResult = closesocket(pListenSocketData->Socket);
        if (iResult == SOCKET_ERROR)
            wprintf(L"closesocket function failed with error %d\n", WSAGetLastError());
        WSACleanup();
        return;
    }

    //Listen for incomming connection requests.
    if (listen(pListenSocketData->Socket, SOMAXCONN) == SOCKET_ERROR)
    {
        wprintf(L"listen function failed with error: %d\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        WSACleanup();
        return;
    }

    wprintf(L"Listening on %ld", Port);

    m_hListenThread = (HANDLE)CreateThread(NULL,                // Security
        0,                  // Stack size - use default
        ListenThread,  // Thread fn entry point
        (void*)pListenSocketData, //Listen Socket Data
        0,                  // Init flag
        NULL);  // Thread address
}

NetworkHandlerThread.h

#pragma once
#include <WinSock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include "ForwardMessageHandler.h"
#include "ConfigHandler.h"
#include "Type1MessageParser.h"
#include "Type2Message-Parser.h"
#include "ThreadUtilities.h"

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

class NetworkHandlerThread
{
public:
    WSADATA wsaData;
    HANDLE m_CompletionPort;
    HANDLE m_hListenThread;

public:
    NetworkHandlerThread();
    ~NetworkHandlerThread();

    void StartNetworkHandler();

    void AddListenThread(int Port,
        ConfigHandler* pConfigHandle,
        void* ForwardHandle);
};

ThreadUtilities.h

#pragma once
#include <mutex>
#include "ConfigHandler.h"


using namespace std;

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE, * LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;
Windows API - Win32
Windows API - Win32
A core set of Windows application programming interfaces (APIs) for desktop and server applications. Previously known as Win32 API.
2,594 questions
C++
C++
A high-level, general-purpose programming language, created as an extension of the C programming language, that has object-oriented, generic, and functional features in addition to facilities for low-level memory manipulation.
3,694 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Xiaopo Yang - MSFT 12,726 Reputation points Microsoft Vendor
    2021-06-16T07:07:58.89+00:00

    According to the Remarks of CreateIoCompletionPort function,

    Objects created by other functions such as socket can also be associated with an I/O completion port. For an example using sockets, see AcceptEx.

    This is the hyperlinks of the Example Code of AcceptEx function which uses overlapped I/O and completion ports.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.