前言
在网络操作中,也可以使用 OVERLAPPED
和 APC
的方式。一种是直接把 socket
当做文件对象来进行
读写操作。另一种是通过微软提供的 WSAxxx
系列函数来进行读写。我们这里先介绍 SELECT模型
。
网络操作(SELECT)
在WINDOWS中使用 SELECT模型
对应函数如下所示
1 2 3 4 5 6 7
| int select( _In_ int nfds, _Inout_ fd_set *readfds, _Inout_ fd_set *writefds, _Inout_ fd_set *exceptfds, _In_ const struct timeval *timeout );
|
所有操作的检查,都是通过 fd_set
结构体来处理,如下所示
1 2 3 4
| typedef struct fd_set { u_int fd_count; SOCKET fd_array[FD_SETSIZE]; } fd_set;
|
结构体中的 FD_SETSIZE
值为 64
,表示默认最大处理 64
个套接字,这个数值我们可以自行修改,
SDK提供了几个宏定义,来对 fd_set
结构体进行操作,如下所示
1 2 3 4
| FD_CLR(s, *set):从set中删除套接字s FD_ISSET(s, *set):检查s是否是set集合的一名成员;如果是,则返回TRUE FD_SET(s, *set):将套接字s加入集合set FD_ZERO(*set):将set初始化成空集合
|
select函数参数中的 readfds
writefds
exceptfds
分别指向 可读
可写 异常
对应的描述符集合。
这3个参数都是传入传出型参数,在调用select之前,我们把套接字通过 FD_SET
分别添加进 readfds
writefds
exceptfds
描述符集合中,select会监听这些套接字。当有就绪的套接字时,select会修改
对应套接字的信息,而我们在下次使用时,需要重新再添加一遍原始的套接字,所以这种方法效率
相对较低,如下为一个简单的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| BOOL SelectTest() { WSADATA wsaData; WSAStartup(MAKEWORD(2, 2), &wsaData); SOCKET socketSrv = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (socketSrv == INVALID_SOCKET) { return FALSE; } SOCKADDR_IN addrSrv; addrSrv.sin_family = AF_INET; addrSrv.sin_addr.S_un.S_addr = htonl(INADDR_ANY); addrSrv.sin_port = htons(45000); u_long opt = 1; int ret = ioctlsocket(socketSrv, FIONBIO, &opt); if (ret != 0) { closesocket(socketSrv); return FALSE; } ret = bind(socketSrv, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR)); if (ret != 0) { closesocket(socketSrv); return FALSE; } ret = listen(socketSrv, SOMAXCONN); if (ret != 0) { closesocket(socketSrv); return FALSE; } fd_set allSockets; FD_ZERO(&allSockets); FD_SET(socketSrv, &allSockets); while (1) { fd_set readSockets = allSockets; fd_set writeSockets = allSockets; fd_set errorSockets = allSockets; ret = select(0, &readSockets, &writeSockets, &errorSockets, NULL); if (ret <= 0) continue; for (u_int i = 0; i < errorSockets.fd_count; i++) { FD_CLR(errorSockets.fd_array[i], &allSockets); closesocket(errorSockets.fd_array[i]); } for (u_int i = 0; i < writeSockets.fd_count; i++) { ret = send(writeSockets.fd_array[i], "ok", 2, 0); if (ret <= 0) { FD_CLR(errorSockets.fd_array[i], &allSockets); closesocket(errorSockets.fd_array[i]); } else { } } for (u_int i = 0; i < readSockets.fd_count; i++) { if (readSockets.fd_array[i] == socketSrv) { if (allSockets.fd_count >= FD_SETSIZE) continue; SOCKET socketCli = accept(socketSrv, NULL, NULL); if (socketCli == INVALID_SOCKET) continue; FD_SET(socketCli, &allSockets); } else { char rcvBuf[1024] = { 0 }; ret = recv(readSockets.fd_array[i], rcvBuf, 1024, 0); if (ret <= 0) { FD_CLR(errorSockets.fd_array[i], &allSockets); closesocket(errorSockets.fd_array[i]); } else { } } } } closesocket(socketSrv); return TRUE; }
|
网络操作(OVERLAPPED)(APC)
微软专门提供了一组 WSAxxx
网络异步操作函数,包括 WSASend
WSASendTo
WSARecv
WSARecvFrom
等
1 2 3 4 5 6 7 8 9
| int WSARecv( _In_ SOCKET s, _Inout_ LPWSABUF lpBuffers, _In_ DWORD dwBufferCount, _Out_ LPDWORD lpNumberOfBytesRecvd, _Inout_ LPDWORD lpFlags, _In_ LPWSAOVERLAPPED lpOverlapped, _In_ LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
|
使用 WSAOVERLAPPED
结构体,并搭配 WSAEVENT
和 WSAWaitForMultipleEvents
来实现异步处理
1 2 3 4 5 6 7 8 9 10 11 12
| typedef struct _WSAOVERLAPPED { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { struct { DWORD Offset; DWORD OffsetHigh; }; PVOID Pointer; }; HANDLE hEvent; } WSAOVERLAPPED, *LPWSAOVERLAPPED;
|
完成函数 WSAOVERLAPPED_COMPLETION_ROUTINE
定义如下
1 2 3 4 5 6
| VOID CALLBACK WSAOVERLAPPED_COMPLETION_ROUTINE( _In_ DWORD dwError, _In_ DWORD cbTransferred, _In_ LPWSAOVERLAPPED lpOverlapped, _In_ DWORD dwFlags );
|
可以看到,WSA操作函数与文件操作函数结构基本一致,这里就不再提供示例代码,可以参考
https://www.cnblogs.com/HPAHPA/p/7819498.html 写的文章。
完成端口(IOCP)
前边所述的各种方法,都或多或少存在一些缺陷,那么有没有比较完美的方法呢,那就是使用完成端口。
完成端口维护一个任务队列,使用少量的线程来发起任务和处理任务,既可以避免频繁的切换大量线程
上下文造成的资源浪费,又不受必须在任务发起线程设置 alertable
状态的限制。
创建完成端口的函数为 CreateIoCompletionPort
定义如下所示
1 2 3 4 5 6 7
| HANDLE WINAPI CreateIoCompletionPort( _In_ HANDLE FileHandle, _In_opt_ HANDLE ExistingCompletionPort, _In_ ULONG_PTR CompletionKey, _In_ DWORD NumberOfConcurrentThreads );
|
使用多线程时,还要注意全局数据读写竞争,以及数据分块读写后,如何顺序重组的问题,我们这里先
只用一个工作线程,避开这些问题,只关注完成端口的原理。获取CPU核心数的代码如下
1 2 3 4 5 6
| DWORD GetCpuCoreCount() { SYSTEM_INFO stInfo = { 0 }; GetSystemInfo(&stInfo); return stInfo.dwNumberOfProcessors; }
|
在创建套接字时,如果使用 WSASocket
函数就,必须加上 WSA_FLAG_OVERLAPPED
标志,如果使用 socket
函数,会自动的附加这个参数,不需要再专门标出。使用方式如下所示
1 2
| WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
接受连接请求有两个函数:分别是 accept
和 AcceptEx
,在并发量比较小的时候,这两个函数效率没啥
区别,在并发量较大时,使用 AcceptEx
效率更高。需要注意的是 AcceptEx
是从 VISTA 才开始提供的,
如果要在 XP 系统下运行,就只能使用 accept
。我们先以 accept
为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| #include <winsock2.h> #pragma comment(lib,"WS2_32.lib")
#define WSABUF_LEN 4096 typedef struct _IO_DATA { OVERLAPPED Overlapped; SOCKET socketCli; WSABUF wsabuf; char opCode; char buf[WSABUF_LEN]; } IO_DATA, *PIO_DATA;
HANDLE g_hCompPort = NULL; SOCKET g_socketSrv = INVALID_SOCKET;
BOOL IOCPTest() { WSADATA wsaData; WSAStartup(MAKEWORD(2, 2), &wsaData); g_socketSrv = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (INVALID_SOCKET == g_socketSrv) { WSACleanup(); return FALSE; } SOCKADDR_IN addrSrv; addrSrv.sin_family = AF_INET; addrSrv.sin_addr.S_un.S_addr = INADDR_ANY; addrSrv.sin_port = htons(45000); int nRet = bind(g_socketSrv, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR)); if (nRet != 0) { closesocket(g_socketSrv); WSACleanup(); return FALSE; } nRet = listen(g_socketSrv, SOMAXCONN); if (nRet != 0) { closesocket(g_socketSrv); WSACleanup(); return FALSE; } g_hCompPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 1); if (NULL == g_hCompPort) { closesocket(g_socketSrv); WSACleanup(); return FALSE; } HANDLE hTemp = CreateIoCompletionPort( (HANDLE)g_socketSrv, g_hCompPort, 0, 0); if (NULL == hTemp) { closesocket(g_socketSrv); WSACleanup(); return FALSE; } HANDLE hThread = CreateThread( NULL, 0, WorkerThreadProc, NULL, 0, NULL); if (NULL == hThread) { closesocket(g_socketSrv); WSACleanup(); return FALSE; } int nErr = 0; DWORD dwBytes = 0; DWORD dwFlags = 0; PIO_DATA pIoContext = NULL; SOCKET socketCli = NULL; while (g_socketSrv) { socketCli = accept(g_socketSrv, NULL, NULL); if (INVALID_SOCKET == socketCli) continue; hTemp = CreateIoCompletionPort( (HANDLE)socketCli, g_hCompPort, (ULONG_PTR)socketCli, 1); if (NULL == hTemp) { closesocket(socketCli); break; } pIoContext = (PIO_DATA)malloc(sizeof(IO_DATA)); if (pIoContext == NULL) { closesocket(socketCli); break; } memset(pIoContext, 0, sizeof(IO_DATA)); pIoContext->opCode = 0; pIoContext->wsabuf.buf = pIoContext->buf; pIoContext->wsabuf.len = WSABUF_LEN; pIoContext->socketCli = socketCli; dwBytes = 0, dwFlags = 0; nRet = WSARecv(socketCli, &pIoContext->wsabuf, 1, &dwBytes, &dwFlags, &pIoContext->Overlapped, NULL); if (nRet != 0) { nErr = WSAGetLastError(); if (ERROR_IO_PENDING != nErr) { free(pIoContext); closesocket(socketCli); break; } } } CloseHandle(g_hCompPort); closesocket(g_socketSrv); WSACleanup(); return TRUE; }
|
在 Worker 线程中,最关键的函数就是 GetQueuedCompletionStatus
,功能就是等待任务队列中的任务
完成,并取出已经完成的任务信息,这个函数的定义如下
1 2 3 4 5 6 7
| BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytes, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED *lpOverlapped, _In_ DWORD dwMilliseconds );
|
如下为工作线程处理读写任务的示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| DWORD WINAPI WorkerThreadProc(LPVOID lpParam) { BOOL bRet = FALSE; DWORD dwIoSize = 0; SOCKET* lpCompKey = NULL; LPOVERLAPPED lpOverlapped = NULL; PIO_DATA pIoContext = NULL; DWORD dwBytes = 0; DWORD dwFlags = 0; int nRet = 0; int nErr = 0; while (1) { bRet = GetQueuedCompletionStatus(g_hCompPort, &dwIoSize, (PULONG_PTR)&lpCompKey, (LPOVERLAPPED*)&lpOverlapped, INFINITE); if (!bRet) break; pIoContext = CONTAINING_RECORD(lpOverlapped, IO_DATA, Overlapped); if (dwIoSize == 0) { closesocket(pIoContext->socketCli); free(pIoContext); continue; } if (pIoContext->opCode == 0) { printf("recv:%s\n", pIoContext->buf); pIoContext->opCode = 1; memset(&pIoContext->Overlapped, 0, sizeof(OVERLAPPED)); strcpy_s(pIoContext->buf, WSABUF_LEN, "200 OK"); pIoContext->wsabuf.len = strlen(pIoContext->buf); dwBytes = 0, dwFlags = 0; nRet = WSASend(pIoContext->socketCli, &pIoContext->wsabuf, 1, &dwBytes, dwFlags, &pIoContext->Overlapped, NULL); if (nRet != 0) { nErr = WSAGetLastError(); if (ERROR_IO_PENDING != nErr) { free(pIoContext); closesocket(pIoContext->socketCli); continue; } } continue; } if (pIoContext->opCode == 1) { printf("send:%s\n", pIoContext->buf); pIoContext->opCode = 0; memset(&pIoContext->Overlapped, 0, sizeof(OVERLAPPED)); memset(pIoContext->buf, 0, WSABUF_LEN); pIoContext->wsabuf.len = WSABUF_LEN; dwBytes = 0, dwFlags = 0; nRet = WSARecv(pIoContext->socketCli, &pIoContext->wsabuf, 1, &dwBytes, &dwFlags, &pIoContext->Overlapped, NULL); if (nRet != 0) { nErr = WSAGetLastError(); if (ERROR_IO_PENDING != nErr) { free(pIoContext); closesocket(pIoContext->socketCli); continue; } } continue; } } return 0; }
|
注意:这里只简单描述了完成端口工作的情况,实际在使用中还有大量的细节需要处理。
本文参考了 https://blog.csdn.net/piggyxp/article/details/6922277 的博客文章