Some understanding about windows completion port (IOCP) (V)
Series Catalog
#include "StdAfx.h" #include "IOCPModel.h" #include "MainDlg.h" // How many threads are spawned on each processor (to maximize server performance, see the accompanying documentation for details) #define WORKER_THREADS_PER_PROCESSOR 2 // Number of simultaneously delivered Accept requests (this is flexible depending on the actual situation) #define MAX_POST_ACCEPT 10 // Exit signal passed to the Worker thread #define EXIT_CODE NULL // Macros for releasing pointer and handle resources // Release pointer macros #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}} // Release handle macro #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}} // Release the Socket macro #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}} CIOCPModel::CIOCPModel(void): m_nThreads(0), m_hShutdownEvent(NULL), m_hIOCompletionPort(NULL), m_phWorkerThreads(NULL), m_strIP(DEFAULT_IP), m_nPort(DEFAULT_PORT), m_pMain(NULL), m_lpfnAcceptEx( NULL ), m_pListenContext( NULL ) { } CIOCPModel::~CIOCPModel(void) { // Ensure complete release of resources this->Stop(); } /////////////////////////////////////////////////////////////////// // Worker threads: Worker threads that serve IOCP requests // That is, whenever a completion packet appears on the completion port, the thread that takes it out for processing /////////////////////////////////////////////////////////////////// DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam) { THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam; CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel; int nThreadNo = (int)pParam->nThreadNo; pIOCPModel->_ShowMessage(_T("Worker threads start,ID: %d."),nThreadNo); OVERLAPPED *pOverlapped = NULL; PER_SOCKET_CONTEXT *pSocketContext = NULL; DWORD dwBytesTransfered = 0; // Loop through requests until a Shutdown message is received while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0)) { BOOL bReturn = GetQueuedCompletionStatus( pIOCPModel->m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE); // If an exit flag is received, exit directly if ( EXIT_CODE==(DWORD)pSocketContext ) { break; } // Determine if an error has occurred if( !bReturn ) { DWORD dwErr = GetLastError(); // Display a reminder message if( !pIOCPModel->HandleError( pSocketContext,dwErr ) ) { break; } continue; } else { // Read the incoming parameters PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped); // Determine if a client has disconnected if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType)) { pIOCPModel->_ShowMessage( _T(" client side %s:%d disconnect."), inet_ntoa(pSocketContext->m_ClientAddr.sin_addr), ntohs(pSocketContext->m_ClientAddr.sin_port) ); // Release the corresponding resources pIOCPModel->_RemoveContext( pSocketContext ); continue; } else { switch( pIoContext->m_OpType ) { // Accept case ACCEPT_POSTED: { // To increase the readability of the code, Here a special_DoAccept function to process incoming requests pIOCPModel->_DoAccpet( pSocketContext, pIoContext ); } break; // RECV case RECV_POSTED: { // To increase the readability of the code, Here a special_DoRecv function to process incoming requests pIOCPModel->_DoRecv( pSocketContext,pIoContext ); } break; // SEND // I'll skip it here., Either that or there's too much code, Not easy to understand,Send Relatively simple to operate case SEND_POSTED: { } break; default: // It shouldn't have been executed here. TRACE(_T("_WorkThread hit the target pIoContext->m_OpType Parameter anomalies. ")); break; } //switch }//if }//if }//while TRACE(_T(" Worker threads %d No. Exit. "),nThreadNo); // Release thread parameters RELEASE(lpParam); return 0; } //==================================================================================== // // System initialization and termination // //==================================================================================== //////////////////////////////////////////////////////////////////// // initializeWinSock 2.2 bool CIOCPModel::LoadSocketLib() { WSADATA wsaData; int nResult; nResult = WSAStartup(MAKEWORD(2,2), &wsaData); // mistakes( It's not usually possible to show) if (NO_ERROR != nResult) { this->_ShowMessage(_T(" initializeWinSock 2.2 failures! ")); return false; } return true; } ////////////////////////////////////////////////////////////////// // start-up server bool CIOCPModel::Start() { // Initialize thread mutexes InitializeCriticalSection(&m_csContextList); // Create event notification of system exit m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL); // initializeIOCP if (false == _InitializeIOCP()) { this->_ShowMessage(_T(" initializeIOCP failures! ")); return false; } else { this->_ShowMessage(_T(" IOCP Initialization complete .")); } // initializeSocket if( false==_InitializeListenSocket() ) { this->_ShowMessage(_T("Listen Socket Initialization failure! ")); this->_DeInitialize(); return false; } else { this->_ShowMessage(_T("Listen Socket Initialization complete.")); } this->_ShowMessage(_T(" System ready, Waiting for connection.... ")); return true; } //////////////////////////////////////////////////////////////////// // Start sending system exit messages, Exit completion port and thread resources void CIOCPModel::Stop() { if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET ) { // Activate to turn off message notifications SetEvent(m_hShutdownEvent); for (int i = 0; i < m_nThreads; i++) { // Notify all completed port operations of exit PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL); } // Wait for all client resources to exit WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE); // Clear client list information this->_ClearContextList(); // Release of other resources this->_DeInitialize(); this->_ShowMessage(_T(" Stop Listening ")); } } //////////////////////////////// // Initialization completion port bool CIOCPModel::_InitializeIOCP() { // Create the first completion port m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( NULL == m_hIOCompletionPort) { this->_ShowMessage(_T(" Failed to establish completion port! error code: %d! "), WSAGetLastError()); return false; } // Depending on the number of processors in this machine, Create the corresponding number of threads m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors(); // Initializing handles for worker threads m_phWorkerThreads = new HANDLE[m_nThreads]; // Create worker threads based on the calculated number DWORD nThreadID; for (int i = 0; i < m_nThreads; i++) { THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER; pThreadParams->pIOCPModel = this; pThreadParams->nThreadNo = i+1; m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID); } TRACE(" establish _WorkerThread %d size. ", m_nThreads ); return true; } ///////////////////////////////////////////////////////////////// // initializeSocket bool CIOCPModel::_InitializeListenSocket() { // AcceptEx harmony GetAcceptExSockaddrs ofGUID, For exporting function pointers GUID GuidAcceptEx = WSAID_ACCEPTEX; GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; // Server Address Information, For bindingSocket struct sockaddr_in ServerAddress; // Generate theSocket information on m_pListenContext = new PER_SOCKET_CONTEXT; // Need to use overlapIO, Must be usedWSASocket come to establishSocket, Only then can overlap be supportedIO operations m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == m_pListenContext->m_Socket) { this->_ShowMessage(_T(" initializeSocket failures, error code: %d. "), WSAGetLastError()); return false; } else { TRACE(_T("WSASocket() accomplish. ")); } // willListen Socket Bind to the completion port in if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort, (DWORD)m_pListenContext, 0)) { this->_ShowMessage(_T(" bind Listen Socket To completion port failure! error code: %d/n"), WSAGetLastError()); RELEASE_SOCKET( m_pListenContext->m_Socket ); return false; } else { TRACE(_T("Listen Socket Binding completion port accomplish. ")); } // Populate address information ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress)); ServerAddress.sin_family = AF_INET; // Here you can bind any availableIP address, Or bind a specifiedIP address ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY); //ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString()); ServerAddress.sin_port = htons(m_nPort); // Binding address and port if (SOCKET_ERROR == bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress))) { this->_ShowMessage(_T("bind() Function execution error. ")); return false; } else { TRACE(_T("bind() accomplish. ")); } // Start Listening if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN)) { this->_ShowMessage(_T("Listen() An error occurred in the execution of the function. ")); return false; } else { TRACE(_T("Listen() accomplish. ")); } // Use the AcceptEx function, as this is a separate extension provided by Microsoft that is outside the WinSock2 specification // So you need to get a little extra pointer to the function. // Get the pointer to the AcceptEx function DWORD dwBytes = 0; if(SOCKET_ERROR == WSAIoctl( m_pListenContext->m_Socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), &dwBytes, NULL, NULL)) { this->_ShowMessage(_T("WSAIoctl Failure to obtainAcceptEx Function Pointer。 error code: %d "), WSAGetLastError()); this->_DeInitialize(); return false; } // Get the pointer to the GetAcceptExSockAddrs function, same thing if(SOCKET_ERROR == WSAIoctl( m_pListenContext->m_Socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs, sizeof(GuidGetAcceptExSockAddrs), &m_lpfnGetAcceptExSockAddrs, sizeof(m_lpfnGetAcceptExSockAddrs), &dwBytes, NULL, NULL)) { this->_ShowMessage(_T("WSAIoctl Failure to obtainGuidGetAcceptExSockAddrs Function Pointer。 error code: %d "), WSAGetLastError()); this->_DeInitialize(); return false; } // Prepare parameters for the AcceptEx and then cast the AcceptEx I/O request for( int i=0;i<MAX_POST_ACCEPT;i++ ) { // Create a new IO_CONTEXT PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext(); if( false==this->_PostAccept( pAcceptIoContext ) ) { m_pListenContext->RemoveContext(pAcceptIoContext); return false; } } this->_ShowMessage( _T(" deliver %d sizeAcceptEx Request completed"),MAX_POST_ACCEPT ); return true; } ////////////////////////////////////////////////////////////
Due to the limited word count of the public article, you can continue reading the next article:Some understanding about windows completion port (IOCP) (VI)
Series Catalog