Having a issue with IO completion ports:
So getting a receive message on the IO completion port i set up, It triggers GetQueuedCompletionStatus and returns with the Completion Key and the Overlapped data. Both appear to be good and I can see data populated in their structures. However the Buffer which was passed to WSARecv was not populated with the incoming message. (the BytesTransfered indicates that there were bytes received, but no data in the WSABUF).
Here is the code as it currently stands, looking for help as to why the Buffer isn't being populated.
networkhandlerthread.ccp
#include "NetworkHandlerThread.h"
// Worker thread, processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
HANDLE CompletionPort = (HANDLE)lpParam;
DWORD BytesTransferred = 0;
OVERLAPPED* lpOverlapped = NULL;
LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
LPPER_IO_OPERATION_DATA PerIoData = NULL;
DWORD Flags = 0;
WSABUF* DataBuf;
DWORD RecvBytes = 0;
Type1MessageParser Type1MsgParser;
Type2MessageParser Type2MsgParser;
int DestinationAddress = 0;
bool IsType1 = false;
while (TRUE)//run forever
{
//Check for new message
if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&PerIoData, INFINITE) == 0)
{
DWORD Err = GetLastError();
if (Err != WAIT_TIMEOUT)
{
printf("GetQueuedCompletionStatus() failed with error %d\n", Err);
if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
{
printf("closesocket() failed with error %d\n", WSAGetLastError());
return 0;
}
GlobalFree(ConnectedSocketData);
}
continue;
}
//We have a message, determine if it's something we receaved or something we should send.
if (PerIoData->OperationType == OPERATION_TYPE_RECV)
{
///tbd process recv
ConnectedSocketData; //this is comming in good and has data
PerIoData->Buffer; // this is empty (pointer is good, but no data)
}
else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
{
///tbd process send
}
}
};
//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
WSANETWORKEVENTS NetworkEvents;
DWORD dwRet;
SOCKADDR_IN NewSockAddr;
SOCKET NewSocket;
int nLen;
while (true) //run forever
{
//Wait for event
dwRet = WSAWaitForMultipleEvents(1,
&(pSocketData->hAcceptEvent),
false,
100,
false);
//Nothing happened, back to top
if (dwRet == WSA_WAIT_TIMEOUT)
continue;
//We got a event, find out which one.
int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
pSocketData->hAcceptEvent,
&NetworkEvents);
if (nRet == SOCKET_ERROR)
{
wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
break;
}
//We got a Accept event
if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
{
//Check for errors
if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
{
// Accept new connection
nLen = sizeof(SOCKADDR_IN);
NewSocket = WSAAccept(pSocketData->Socket,
(LPSOCKADDR)&NewSockAddr,
&nLen, NULL, NULL);
if (NewSocket == SOCKET_ERROR)
{
wprintf(L"accept() error %ld\n", WSAGetLastError());
break;
}
wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);
//Set new connection as TCP connection, No Delay
//const char chOpt = 1;
//int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
//if (nErr == -1)
//{
// wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
// break;
//}
LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;
ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));
ConnectedSocketData->Socket = NewSocket;
ConnectedSocketData->Port = pSocketData->Port;
ConnectedSocketData->IOCP = pSocketData->IOCP;
ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;
//Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0) == NULL)
{
wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
delete ConnectedSocketData;
ConnectedSocketData = NULL;
closesocket(NewSocket);
break;
}
//Set the PerIOData, will be used at completion time
LPPER_IO_OPERATION_DATA PerIoData;
PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
PerIoData->BufferLen = 0;
PerIoData->OperationType = OPERATION_TYPE_RECV;
DWORD RecvBytes = 0;
DWORD Flags = 0;
PerIoData->Buffer.buf = PerIoData->cBuffer;
PerIoData->Buffer.len = DATA_BUFSIZE;
//Kick off the first Recv request for the Socket, will be handled by the completion Queue.
if (WSARecv(NewSocket, &(PerIoData->Buffer), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR)
{
wprintf(L"WSARecv error %ld\n", WSAGetLastError());
return 0;
}
}
else
{
wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
break;
}
}
}
}
NetworkHandlerThread::NetworkHandlerThread()
{
m_CompletionPort = 0;
m_hListenThread = 0;
}
NetworkHandlerThread::~NetworkHandlerThread()
{
}
void NetworkHandlerThread::StartNetworkHandler()
{
int iResult = 0;
SYSTEM_INFO SystemInfo;
unsigned int i = 0;
//Start WSA
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != NO_ERROR) {
wprintf(L"WSAStartup() failed with error: %d\n", iResult);
return;
}
//Start Completion Port
m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (m_CompletionPort != NULL)
{
wprintf(L"Completion Port Created\n");
}
//Get # of system processors
GetSystemInfo(&SystemInfo);
//create Worker Threads for each processor.
for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
{
HANDLE ThreadHandle;
// Create a server worker thread, and pass the
// completion port to the thread.
ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, m_CompletionPort, 0, NULL);
// Close the thread handle
if (ThreadHandle != NULL)
{
CloseHandle(ThreadHandle);
}
}
}
void NetworkHandlerThread::AddListenThread(int Port,
ConfigHandler* pConfigHandle,
void* ForwardHandle)
{
SOCKADDR_IN InternetAddr;
int iResult = 0;
LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;
if (pListenSocketData == NULL)
{
return;
}
//Create the listener Socket
pListenSocketData->Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (pListenSocketData->Socket == INVALID_SOCKET)
{
wprintf(L"socket function failed with error: %ld\n", WSAGetLastError());
WSACleanup();
return;
}
// Create a Event to handle Socket Accepts
pListenSocketData->hAcceptEvent = WSACreateEvent();
if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
{
wprintf(L"WSACreateEvent() error %ld\n", WSAGetLastError());
closesocket(pListenSocketData->Socket);
return;
}
// Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
int nRet = WSAEventSelect(pListenSocketData->Socket,
pListenSocketData->hAcceptEvent,
FD_ACCEPT);
if (nRet == SOCKET_ERROR)
{
wprintf(L"WSAAsyncSelect() error %ld\n", WSAGetLastError());
closesocket(pListenSocketData->Socket);
return;
}
//Assign the Port Number
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(Port);
pListenSocketData->Port = Port;
pListenSocketData->IOCP = m_CompletionPort;
pListenSocketData->CfgHandle = pConfigHandle;
pListenSocketData->ForwardMessager = ForwardHandle;
//Bind the Socket to the Port
iResult = ::bind((pListenSocketData->Socket), (sockaddr*)&InternetAddr, sizeof(InternetAddr));
if (iResult == SOCKET_ERROR) {
wprintf(L"bind function failed with error %d\n", WSAGetLastError());
iResult = closesocket(pListenSocketData->Socket);
if (iResult == SOCKET_ERROR)
wprintf(L"closesocket function failed with error %d\n", WSAGetLastError());
WSACleanup();
return;
}
//Listen for incomming connection requests.
if (listen(pListenSocketData->Socket, SOMAXCONN) == SOCKET_ERROR)
{
wprintf(L"listen function failed with error: %d\n", WSAGetLastError());
closesocket(pListenSocketData->Socket);
WSACleanup();
return;
}
wprintf(L"Listening on %ld", Port);
m_hListenThread = (HANDLE)CreateThread(NULL, // Security
0, // Stack size - use default
ListenThread, // Thread fn entry point
(void*)pListenSocketData, //Listen Socket Data
0, // Init flag
NULL); // Thread address
}
NetworkHandlerThread.h
#pragma once
#include <WinSock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include "ForwardMessageHandler.h"
#include "ConfigHandler.h"
#include "Type1MessageParser.h"
#include "Type2Message-Parser.h"
#include "ThreadUtilities.h"
#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2
class NetworkHandlerThread
{
public:
WSADATA wsaData;
HANDLE m_CompletionPort;
HANDLE m_hListenThread;
public:
NetworkHandlerThread();
~NetworkHandlerThread();
void StartNetworkHandler();
void AddListenThread(int Port,
ConfigHandler* pConfigHandle,
void* ForwardHandle);
};
ThreadUtilities.h
#pragma once
#include <mutex>
#include "ConfigHandler.h"
using namespace std;
#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2
typedef struct _THREAD_MESSAGE
{
mutex cmd_mtx;
string command;
} THREAD_MESSAGE, * LPTHREAD_MESSAGE;
typedef struct _LISTEN_SOCKET_DATA
{
SOCKET Socket;
int Port;
HANDLE hAcceptEvent;
HANDLE IOCP;
VOID* ForwardMessager;
ConfigHandler* CfgHandle;
// Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;
typedef struct _CONNECTED_SOCKET_DATA
{
SOCKET Socket;
int Port;
HANDLE IOCP;
VOID* ForwardMessager;
ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;