0%

重叠IO模型和完成端口(2)

前言

在网络操作中,也可以使用 OVERLAPPEDAPC 的方式。一种是直接把 socket 当做文件对象来进行
读写操作。另一种是通过微软提供的 WSAxxx 系列函数来进行读写。我们这里先介绍 SELECT模型

网络操作(SELECT)

在WINDOWS中使用 SELECT模型 对应函数如下所示

1
2
3
4
5
6
7
int select(
_In_ int nfds, // 忽略,兼容UNIX用
_Inout_ fd_set *readfds, // 检查可读性
_Inout_ fd_set *writefds, // 检查可写性
_Inout_ fd_set *exceptfds, // 检查例外数据
_In_ const struct timeval *timeout // 等待的超时时间
);

所有操作的检查,都是通过 fd_set 结构体来处理,如下所示

1
2
3
4
typedef struct fd_set {
u_int fd_count;
SOCKET fd_array[FD_SETSIZE];
} fd_set;

结构体中的 FD_SETSIZE 值为 64,表示默认最大处理 64 个套接字,这个数值我们可以自行修改,
SDK提供了几个宏定义,来对 fd_set 结构体进行操作,如下所示

1
2
3
4
FD_CLR(s, *set):从set中删除套接字s
FD_ISSET(s, *set):检查s是否是set集合的一名成员;如果是,则返回TRUE
FD_SET(s, *set):将套接字s加入集合set
FD_ZERO(*set):将set初始化成空集合

select函数参数中的 readfds writefds exceptfds 分别指向 可读 可写 异常 对应的描述符集合。
这3个参数都是传入传出型参数,在调用select之前,我们把套接字通过 FD_SET 分别添加进 readfds
writefds exceptfds 描述符集合中,select会监听这些套接字。当有就绪的套接字时,select会修改
对应套接字的信息,而我们在下次使用时,需要重新再添加一遍原始的套接字,所以这种方法效率
相对较低,如下为一个简单的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
BOOL SelectTest()
{
// 初始化
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
// 创建服务端
SOCKET socketSrv = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socketSrv == INVALID_SOCKET)
{
return FALSE;
}
SOCKADDR_IN addrSrv;
addrSrv.sin_family = AF_INET;
addrSrv.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
addrSrv.sin_port = htons(45000);
// 设为非阻塞(有疑问,待验证)
u_long opt = 1;
int ret = ioctlsocket(socketSrv, FIONBIO, &opt);
if (ret != 0)
{
closesocket(socketSrv);
return FALSE;
}
// 绑定
ret = bind(socketSrv, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR));
if (ret != 0)
{
closesocket(socketSrv);
return FALSE;
}
// 监听
ret = listen(socketSrv, SOMAXCONN);
if (ret != 0)
{
closesocket(socketSrv);
return FALSE;
}
// 把服务端装入合集
fd_set allSockets;
FD_ZERO(&allSockets);
FD_SET(socketSrv, &allSockets);
while (1)
{
// 每次都要重新赋值
fd_set readSockets = allSockets;
fd_set writeSockets = allSockets;
fd_set errorSockets = allSockets;
// 开始等待相关操作
ret = select(0, &readSockets, &writeSockets, &errorSockets, NULL);
if (ret <= 0) continue;
// 检查是否有错误
for (u_int i = 0; i < errorSockets.fd_count; i++)
{
FD_CLR(errorSockets.fd_array[i], &allSockets);
closesocket(errorSockets.fd_array[i]);
}
// 检查是否有可写
for (u_int i = 0; i < writeSockets.fd_count; i++)
{
ret = send(writeSockets.fd_array[i], "ok", 2, 0);
if (ret <= 0)
{
FD_CLR(errorSockets.fd_array[i], &allSockets);
closesocket(errorSockets.fd_array[i]);
}
else
{
// do something ...
}
}
// 检查是否有可读
for (u_int i = 0; i < readSockets.fd_count; i++)
{
if (readSockets.fd_array[i] == socketSrv)
{
// 接受连接并放入合集(检查是否超出上限)
if (allSockets.fd_count >= FD_SETSIZE) continue;
SOCKET socketCli = accept(socketSrv, NULL, NULL);
if (socketCli == INVALID_SOCKET) continue;
FD_SET(socketCli, &allSockets);
}
else
{
// 假定数据不超过缓冲区
char rcvBuf[1024] = { 0 };
ret = recv(readSockets.fd_array[i], rcvBuf, 1024, 0);
if (ret <= 0)
{
FD_CLR(errorSockets.fd_array[i], &allSockets);
closesocket(errorSockets.fd_array[i]);
}
else
{
// do something ...
}
}
}
}
closesocket(socketSrv);
return TRUE;
}

网络操作(OVERLAPPED)(APC)

微软专门提供了一组 WSAxxx 网络异步操作函数,包括 WSASend WSASendTo WSARecv WSARecvFrom

1
2
3
4
5
6
7
8
9
int WSARecv(
_In_ SOCKET s,
_Inout_ LPWSABUF lpBuffers,
_In_ DWORD dwBufferCount,
_Out_ LPDWORD lpNumberOfBytesRecvd,
_Inout_ LPDWORD lpFlags,
_In_ LPWSAOVERLAPPED lpOverlapped,
_In_ LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

使用 WSAOVERLAPPED 结构体,并搭配 WSAEVENTWSAWaitForMultipleEvents 来实现异步处理

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct _WSAOVERLAPPED {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
struct {
DWORD Offset;
DWORD OffsetHigh;
};
PVOID Pointer;
};
HANDLE hEvent;
} WSAOVERLAPPED, *LPWSAOVERLAPPED;

完成函数 WSAOVERLAPPED_COMPLETION_ROUTINE 定义如下

1
2
3
4
5
6
VOID CALLBACK WSAOVERLAPPED_COMPLETION_ROUTINE(
_In_ DWORD dwError,
_In_ DWORD cbTransferred,
_In_ LPWSAOVERLAPPED lpOverlapped,
_In_ DWORD dwFlags
); // 完成函数

可以看到,WSA操作函数与文件操作函数结构基本一致,这里就不再提供示例代码,可以参考
https://www.cnblogs.com/HPAHPA/p/7819498.html 写的文章。

完成端口(IOCP)

前边所述的各种方法,都或多或少存在一些缺陷,那么有没有比较完美的方法呢,那就是使用完成端口。
完成端口维护一个任务队列,使用少量的线程来发起任务和处理任务,既可以避免频繁的切换大量线程
上下文造成的资源浪费,又不受必须在任务发起线程设置 alertable 状态的限制。

创建完成端口的函数为 CreateIoCompletionPort 定义如下所示

1
2
3
4
5
6
7
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads // 并发执行最大线程数,一般是CPU核心数*2
// 设为0则表示与系统中的处理器数量一样多
);

使用多线程时,还要注意全局数据读写竞争,以及数据分块读写后,如何顺序重组的问题,我们这里先
只用一个工作线程,避开这些问题,只关注完成端口的原理。获取CPU核心数的代码如下

1
2
3
4
5
6
DWORD GetCpuCoreCount()
{
SYSTEM_INFO stInfo = { 0 };
GetSystemInfo(&stInfo);
return stInfo.dwNumberOfProcessors;
}

在创建套接字时,如果使用 WSASocket 函数就,必须加上 WSA_FLAG_OVERLAPPED 标志,如果使用 socket
函数,会自动的附加这个参数,不需要再专门标出。使用方式如下所示

1
2
WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); // 自带WSA_FLAG_OVERLAPPED标志

接受连接请求有两个函数:分别是 acceptAcceptEx,在并发量比较小的时候,这两个函数效率没啥
区别,在并发量较大时,使用 AcceptEx 效率更高。需要注意的是 AcceptEx 是从 VISTA 才开始提供的,
如果要在 XP 系统下运行,就只能使用 accept 。我们先以 accept 为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <winsock2.h>
#pragma comment(lib,"WS2_32.lib")
// 结构体定义
#define WSABUF_LEN 4096
typedef struct _IO_DATA {
OVERLAPPED Overlapped;
SOCKET socketCli; // 客户端
WSABUF wsabuf; // 缓冲区指针
char opCode; // 0读,1写
char buf[WSABUF_LEN]; // 缓冲区
} IO_DATA, *PIO_DATA;
// 全局变量
HANDLE g_hCompPort = NULL;
SOCKET g_socketSrv = INVALID_SOCKET;
// 完成端口测试函数
BOOL IOCPTest()
{
// 初始化
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
// 创建SOCKET
g_socketSrv = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (INVALID_SOCKET == g_socketSrv)
{
WSACleanup();
return FALSE;
}
// 绑定IP地址
SOCKADDR_IN addrSrv;
addrSrv.sin_family = AF_INET;
addrSrv.sin_addr.S_un.S_addr = INADDR_ANY;
addrSrv.sin_port = htons(45000);
int nRet = bind(g_socketSrv, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR));
if (nRet != 0)
{
closesocket(g_socketSrv);
WSACleanup();
return FALSE;
}
// 监听服务端
nRet = listen(g_socketSrv, SOMAXCONN);
if (nRet != 0)
{
closesocket(g_socketSrv);
WSACleanup();
return FALSE;
}
// 创建完成端口
g_hCompPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, NULL, 0, 1);
if (NULL == g_hCompPort)
{
closesocket(g_socketSrv);
WSACleanup();
return FALSE;
}
// 绑定服务端
HANDLE hTemp = CreateIoCompletionPort(
(HANDLE)g_socketSrv, g_hCompPort, 0, 0);
if (NULL == hTemp)
{
closesocket(g_socketSrv);
WSACleanup();
return FALSE;
}
// 创建工作线程
HANDLE hThread = CreateThread(
NULL, 0, WorkerThreadProc, NULL, 0, NULL);
if (NULL == hThread)
{
closesocket(g_socketSrv);
WSACleanup();
return FALSE;
}
// 循环接收连接请求
int nErr = 0;
DWORD dwBytes = 0;
DWORD dwFlags = 0;
PIO_DATA pIoContext = NULL;
SOCKET socketCli = NULL;
while (g_socketSrv)
{
socketCli = accept(g_socketSrv, NULL, NULL);
if (INVALID_SOCKET == socketCli) continue;
// 绑定客户端,并作为key参数
hTemp = CreateIoCompletionPort(
(HANDLE)socketCli, g_hCompPort, (ULONG_PTR)socketCli, 1);
if (NULL == hTemp)
{
closesocket(socketCli);
break;
}
// 初始化上下文
pIoContext = (PIO_DATA)malloc(sizeof(IO_DATA));
if (pIoContext == NULL)
{
closesocket(socketCli);
break;
}
memset(pIoContext, 0, sizeof(IO_DATA));
pIoContext->opCode = 0; // 读操作
pIoContext->wsabuf.buf = pIoContext->buf;
pIoContext->wsabuf.len = WSABUF_LEN;
pIoContext->socketCli = socketCli;
// 发起接收请求
dwBytes = 0, dwFlags = 0;
nRet = WSARecv(socketCli, &pIoContext->wsabuf, 1,
&dwBytes, &dwFlags, &pIoContext->Overlapped, NULL);
if (nRet != 0)
{
nErr = WSAGetLastError();
if (ERROR_IO_PENDING != nErr)
{
free(pIoContext);
closesocket(socketCli);
break;
}
}
}
CloseHandle(g_hCompPort);
closesocket(g_socketSrv);
WSACleanup();
return TRUE;
}

在 Worker 线程中,最关键的函数就是 GetQueuedCompletionStatus,功能就是等待任务队列中的任务
完成,并取出已经完成的任务信息,这个函数的定义如下

1
2
3
4
5
6
7
BOOL WINAPI GetQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_Out_ LPDWORD lpNumberOfBytes,
_Out_ PULONG_PTR lpCompletionKey,
_Out_ LPOVERLAPPED *lpOverlapped,
_In_ DWORD dwMilliseconds
);

如下为工作线程处理读写任务的示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
DWORD WINAPI WorkerThreadProc(LPVOID lpParam)
{
// 循环获取任务队列
BOOL bRet = FALSE;
DWORD dwIoSize = 0;
SOCKET* lpCompKey = NULL;
LPOVERLAPPED lpOverlapped = NULL;
PIO_DATA pIoContext = NULL;
DWORD dwBytes = 0;
DWORD dwFlags = 0;
int nRet = 0;
int nErr = 0;
while (1)
{
bRet = GetQueuedCompletionStatus(g_hCompPort, &dwIoSize,
(PULONG_PTR)&lpCompKey, (LPOVERLAPPED*)&lpOverlapped, INFINITE);
if (!bRet) break; // 出现错误
pIoContext = CONTAINING_RECORD(lpOverlapped, IO_DATA, Overlapped);
if (dwIoSize == 0)
{
// 连接已关闭
closesocket(pIoContext->socketCli);
free(pIoContext);
continue;
}
// 读操作完成
if (pIoContext->opCode == 0)
{
printf("recv:%s\n", pIoContext->buf);
// 发起发送请求
pIoContext->opCode = 1; // 写操作
memset(&pIoContext->Overlapped, 0, sizeof(OVERLAPPED));
strcpy_s(pIoContext->buf, WSABUF_LEN, "200 OK");
pIoContext->wsabuf.len = strlen(pIoContext->buf);
dwBytes = 0, dwFlags = 0;
nRet = WSASend(pIoContext->socketCli, &pIoContext->wsabuf, 1,
&dwBytes, dwFlags, &pIoContext->Overlapped, NULL);
if (nRet != 0)
{
nErr = WSAGetLastError();
if (ERROR_IO_PENDING != nErr)
{
free(pIoContext);
closesocket(pIoContext->socketCli);
continue;
}
}
continue;
}
// 写操作完成
if (pIoContext->opCode == 1)
{
printf("send:%s\n", pIoContext->buf);
// 发起接收请求
pIoContext->opCode = 0; // 读操作
memset(&pIoContext->Overlapped, 0, sizeof(OVERLAPPED));
memset(pIoContext->buf, 0, WSABUF_LEN);
pIoContext->wsabuf.len = WSABUF_LEN;
dwBytes = 0, dwFlags = 0;
nRet = WSARecv(pIoContext->socketCli, &pIoContext->wsabuf, 1,
&dwBytes, &dwFlags, &pIoContext->Overlapped, NULL);
if (nRet != 0)
{
nErr = WSAGetLastError();
if (ERROR_IO_PENDING != nErr)
{
free(pIoContext);
closesocket(pIoContext->socketCli);
continue;
}
}
continue;
}
}
return 0;
}

注意:这里只简单描述了完成端口工作的情况,实际在使用中还有大量的细节需要处理。
本文参考了 https://blog.csdn.net/piggyxp/article/details/6922277 的博客文章