0%

重叠IO模型和完成端口(1)

前言

我们在做数据处理时,通常有 同步(Synchronous)异步(Asynchronous) 两种操作方式。

同步模式:就是调用方发出调用请求后,一直要等待该请求完成返回后才能继续往后执行。
异步模式:就是调用方发出调用请求后,不需要等待该请求完成返回,可以直接继续处理其他事情,
当被调用的请求执行完毕后,通过某个标志,通知调用者再回来处理返回的结果。

文件操作(OVERLAPPED)

通常情况下,我们使用 ReadFile WriteFile 来进行读写操作,其函数定义如下

1
2
3
4
5
6
7
BOOL WINAPI ReadFile(
_In_ HANDLE hFile, // 需要FILE_FLAG_OVERLAPPED权限
_Out_ LPVOID lpBuffer, // 存储读取信息的缓冲区
_In_ DWORD nNumberOfBytesToRead, // 期望读取信息的字节数
_Out_opt_ LPDWORD lpNumberOfBytesRead, // 实际读取信息的字节数
_Inout_opt_ LPOVERLAPPED lpOverlapped // 指定重叠I/O操作的结构体
);
1
2
3
4
5
6
7
BOOL WINAPI WriteFile(
_In_ HANDLE hFile, // 需要FILE_FLAG_OVERLAPPED权限
_In_ LPCVOID lpBuffer, // 存储写入信息的缓冲区
_In_ DWORD nNumberOfBytesToWrite, // 期望写入信息的字节数
_Out_opt_ LPDWORD lpNumberOfBytesWritten, // 实际写入信息的字节数
_Inout_opt_ LPOVERLAPPED lpOverlapped // 指定重叠I/O操作的结构体
);

使用 OVERLAPPED 结构体,并搭配事件 EVENT 和等待函数 WaitForMultipleObjects 来实现异步处理

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct _OVERLAPPED {
ULONG_PTR Internal; // 系统保留自用,可能是IO_STATUS_BLOCK成员
ULONG_PTR InternalHigh; // 系统保留自用,可能是IO_STATUS_BLOCK成员
union {
struct {
DWORD Offset; // 访问偏移位置,低4字节
DWORD OffsetHigh; // 访问偏移位置,高4字节
};
PVOID Pointer; // 系统保留自用
};
HANDLE hEvent; // 需要创建手动复位的事件
} OVERLAPPED, *LPOVERLAPPED;

示例代码如下,由于上限 MAXIMUM_WAIT_OBJECTS64,表示一个线程最多只能同时等待 64 个事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 相关定义
#define INSTANCES 5
typedef struct _TASKINST {
OVERLAPPED stOverlap;
HANDLE hFile; // 文件的句柄
DWORD dwSize; // 读写数据大小
CHAR szBuf[64]; // 数据的缓冲区
} TASKINST, *PTASKINST;
// 全局变量
HANDLE g_hEvents[INSTANCES] = { 0 };
TASKINST g_stTasks[INSTANCES] = { 0 };
// 关闭所有句柄函数
VOID CloseAllHandle()
{
for (int i = 0; i < INSTANCES; i++)
{
if (g_hEvents[i] != NULL)
CloseHandle(g_hEvents[i]);
if (g_stTasks[i].hFile != NULL)
CloseHandle(g_stTasks[i].hFile);
}
}
// 重叠IO处理示例函数
BOOL OverlappedTest()
{
BOOL bRet = 0;
DWORD dwErr = 0;
for (int i = 0; i < INSTANCES; i++)
{
// 创建手动复位的事件
g_hEvents[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
if (g_hEvents[i] == NULL)
{
CloseAllHandle();
return FALSE;
}
g_stTasks[i].stOverlap.hEvent = g_hEvents[i];
// 创建文件
sprintf_s(g_stTasks[i].szBuf, 64, "abc%d.txt", i);
g_stTasks[i].hFile = CreateFileA(g_stTasks[i].szBuf,
GENERIC_WRITE, FILE_SHARE_WRITE, NULL, CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (g_stTasks[i].hFile == INVALID_HANDLE_VALUE)
{
g_stTasks[i].hFile = NULL;
CloseAllHandle();
return FALSE;
}
// 发起写入任务
g_stTasks[i].dwSize = strlen(g_stTasks[i].szBuf);
bRet = WriteFile(g_stTasks[i].hFile, g_stTasks[i].szBuf,
g_stTasks[i].dwSize, NULL, &g_stTasks[i].stOverlap);
if (!bRet)
{
dwErr = GetLastError();
if (dwErr != ERROR_IO_PENDING)
{
CloseAllHandle();
return FALSE;
}
}
}
// 等待所有任务处理完毕
DWORD dwRet = WaitForMultipleObjects(
INSTANCES, g_hEvents, TRUE, INFINITE);
if (dwRet != WAIT_OBJECT_0)
{
CloseAllHandle();
return FALSE;
}
// 检查所有操作是否正常结束
DWORD dwRetSize = 0;
for (int i = 0; i < INSTANCES; i++)
{
bRet = GetOverlappedResult(g_stTasks[i].hFile,
&g_stTasks[i].stOverlap, &dwRetSize, FALSE);
if ((!bRet) || (dwRetSize != g_stTasks[i].dwSize))
{
CloseAllHandle();
return FALSE;
}
}
// 关闭所有句柄
CloseAllHandle();
return TRUE;
}

通过以上的代码可以看到,如果采用同步操作,消耗的总时间是 每一次操作的时间总和,而采用异步操作,
消耗的总时间是 花费时间最长的那一次操作的时间,提高了I/O操作的利用率。

文件操作(APC)

另外,使用 ReadFileEx WriteFileEx 可以避开最多等待 64 个事件的限制,函数定义如下

1
2
3
4
5
6
7
BOOL WINAPI ReadFileEx(
_In_ HANDLE hFile,
_Out_opt_ LPVOID lpBuffer,
_In_ DWORD nNumberOfBytesToRead,
_Inout_ LPOVERLAPPED lpOverlapped,
_In_ LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
1
2
3
4
5
6
7
BOOL WINAPI WriteFileEx(
_In_ HANDLE hFile,
_In_opt_ LPCVOID lpBuffer,
_In_ DWORD nNumberOfBytesToWrite,
_Inout_ LPOVERLAPPED lpOverlapped,
_In_ LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

这两个函数增加了一个 OVERLAPPED_COMPLETION_ROUTINE 完成函数指针,当I/O任务结束时操作系统会调用
我们指定的完成函数,这种方式称为 异步过程调用(Asynchronous Procedure Call, APC)。函数中的参数
lpOverlapped 的成员 hEvent 没有被使用,所以我们可以用来传递一些自定义的信息。

1
2
3
4
5
VOID CALLBACK FileIOCompletionRoutine(
_In_ DWORD dwErrorCode,
_In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped
); // 完成函数

注意:Windows 不会贸然中断某个程序,然后调用提供的 Callback 函数,线程必须在所谓的 alertable
状态之下才行。如果有一个I/O操作完成,而线程不处于 alertable 状态,那么对 I/O完成函数 的调用就会
暂时被保留下来。因此,当一个线程终于进入 alertable 状态时,可能已经有大量的 APCs 等待被处理。
我们可以使用如下几个函数,使线程进入 alertable 状态:

1
2
3
4
5
6
SleepEx();
SignalObjectAndWait();
MsgWaitForMultipleObjectsEx();
WaitForMultipleObjectsEx();
WaitForSingleObjectEx();
GetOverlappedResultEx();

如下为示例代码,先发起一些操作请求,然后再使线程进入 alertable 状态来处理所有 APCs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 相关定义
typedef struct _TASKINST {
OVERLAPPED stOverlap;
HANDLE hFile; // 文件的句柄
DWORD dwSize; // 读写数据大小
CHAR szBuf[64]; // 数据的缓冲区
} TASKINST, *PTASKINST;
// 全局变量
DWORD g_dwTaskNum = 0; // 发起的数量
DWORD g_dwTaskFinish = 0; // 完成的数量
// APC处理的完成函数
VOID CALLBACK CompletionRoutine(
DWORD dwErrorCode, DWORD dwTransfered, LPOVERLAPPED lpOverlapped)
{
g_dwTaskFinish++; // 完成1个
PTASKINST pTask = (PTASKINST)lpOverlapped;
if (pTask != NULL)
{
CloseHandle(pTask->hFile);
free(pTask);
}
}
// APC处理的示例函数
BOOL APCsTest()
{
BOOL bRet = 0;
DWORD dwErr = 0;
PTASKINST pTask = NULL;
for (int i = 0; i < 5; i++)
{
// 申请任务内存空间
pTask = (PTASKINST)malloc(sizeof(TASKINST));
if (pTask == NULL) continue;
memset(pTask, 0, sizeof(TASKINST));
sprintf_s(pTask->szBuf, 64, "abc%d.txt", i);
// 创建文件
pTask->hFile = CreateFileA(pTask->szBuf,
GENERIC_WRITE, FILE_SHARE_WRITE, NULL, CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (pTask->hFile == INVALID_HANDLE_VALUE)
{
free(pTask);
continue;
}
// 可以使用hEvent来传递自定义数据
pTask->stOverlap.hEvent = (HANDLE)i;
// 发起写入任务
g_dwTaskNum++; // 发起1个
pTask->dwSize = strlen(pTask->szBuf);
bRet = WriteFileEx(pTask->hFile, pTask->szBuf,
pTask->dwSize, &pTask->stOverlap, CompletionRoutine);
if (!bRet)
{
dwErr = GetLastError();
if (dwErr != ERROR_IO_PENDING)
{
g_dwTaskNum--; // 失败1个
CloseHandle(pTask->hFile);
free(pTask);
}
}
}
// 循环处理所有任务
DWORD dwRet = 0;
while (1)
{
// 使线程进入alertable状态
dwRet = SleepEx(INFINITE, TRUE);
// 一个APC操作已完成
if (dwRet != WAIT_IO_COMPLETION)
{
return FALSE; // 出错
}
// 全部操作已完成
if (g_dwTaskFinish >= g_dwTaskNum) break;
}
return TRUE;
}

进过测试发现,只有把调用 ReadFileEx 的线程设置为 alertable 状态,才会处理 APC 任务,
而单独创建新线程设置 alertable 状态,不会处理 APC 任务,细节有待进一步的研究。