前言
我们在做数据处理时,通常有 同步(Synchronous)
和 异步(Asynchronous)
两种操作方式。
同步模式:就是调用方发出调用请求后,一直要等待该请求完成返回后才能继续往后执行。
异步模式:就是调用方发出调用请求后,不需要等待该请求完成返回,可以直接继续处理其他事情,
当被调用的请求执行完毕后,通过某个标志,通知调用者再回来处理返回的结果。
文件操作(OVERLAPPED)
通常情况下,我们使用 ReadFile
和 WriteFile
来进行读写操作,其函数定义如下
1 2 3 4 5 6 7
| BOOL WINAPI ReadFile( _In_ HANDLE hFile, _Out_ LPVOID lpBuffer, _In_ DWORD nNumberOfBytesToRead, _Out_opt_ LPDWORD lpNumberOfBytesRead, _Inout_opt_ LPOVERLAPPED lpOverlapped );
|
1 2 3 4 5 6 7
| BOOL WINAPI WriteFile( _In_ HANDLE hFile, _In_ LPCVOID lpBuffer, _In_ DWORD nNumberOfBytesToWrite, _Out_opt_ LPDWORD lpNumberOfBytesWritten, _Inout_opt_ LPOVERLAPPED lpOverlapped );
|
使用 OVERLAPPED
结构体,并搭配事件 EVENT
和等待函数 WaitForMultipleObjects
来实现异步处理
1 2 3 4 5 6 7 8 9 10 11 12
| typedef struct _OVERLAPPED { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { struct { DWORD Offset; DWORD OffsetHigh; }; PVOID Pointer; }; HANDLE hEvent; } OVERLAPPED, *LPOVERLAPPED;
|
示例代码如下,由于上限 MAXIMUM_WAIT_OBJECTS
为 64
,表示一个线程最多只能同时等待 64
个事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| #define INSTANCES 5 typedef struct _TASKINST { OVERLAPPED stOverlap; HANDLE hFile; DWORD dwSize; CHAR szBuf[64]; } TASKINST, *PTASKINST;
HANDLE g_hEvents[INSTANCES] = { 0 }; TASKINST g_stTasks[INSTANCES] = { 0 };
VOID CloseAllHandle() { for (int i = 0; i < INSTANCES; i++) { if (g_hEvents[i] != NULL) CloseHandle(g_hEvents[i]); if (g_stTasks[i].hFile != NULL) CloseHandle(g_stTasks[i].hFile); } }
BOOL OverlappedTest() { BOOL bRet = 0; DWORD dwErr = 0; for (int i = 0; i < INSTANCES; i++) { g_hEvents[i] = CreateEvent(NULL, TRUE, FALSE, NULL); if (g_hEvents[i] == NULL) { CloseAllHandle(); return FALSE; } g_stTasks[i].stOverlap.hEvent = g_hEvents[i]; sprintf_s(g_stTasks[i].szBuf, 64, "abc%d.txt", i); g_stTasks[i].hFile = CreateFileA(g_stTasks[i].szBuf, GENERIC_WRITE, FILE_SHARE_WRITE, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL); if (g_stTasks[i].hFile == INVALID_HANDLE_VALUE) { g_stTasks[i].hFile = NULL; CloseAllHandle(); return FALSE; } g_stTasks[i].dwSize = strlen(g_stTasks[i].szBuf); bRet = WriteFile(g_stTasks[i].hFile, g_stTasks[i].szBuf, g_stTasks[i].dwSize, NULL, &g_stTasks[i].stOverlap); if (!bRet) { dwErr = GetLastError(); if (dwErr != ERROR_IO_PENDING) { CloseAllHandle(); return FALSE; } } } DWORD dwRet = WaitForMultipleObjects( INSTANCES, g_hEvents, TRUE, INFINITE); if (dwRet != WAIT_OBJECT_0) { CloseAllHandle(); return FALSE; } DWORD dwRetSize = 0; for (int i = 0; i < INSTANCES; i++) { bRet = GetOverlappedResult(g_stTasks[i].hFile, &g_stTasks[i].stOverlap, &dwRetSize, FALSE); if ((!bRet) || (dwRetSize != g_stTasks[i].dwSize)) { CloseAllHandle(); return FALSE; } } CloseAllHandle(); return TRUE; }
|
通过以上的代码可以看到,如果采用同步操作,消耗的总时间是 每一次操作的时间总和
,而采用异步操作,
消耗的总时间是 花费时间最长的那一次操作的时间
,提高了I/O操作的利用率。
文件操作(APC)
另外,使用 ReadFileEx
和 WriteFileEx
可以避开最多等待 64
个事件的限制,函数定义如下
1 2 3 4 5 6 7
| BOOL WINAPI ReadFileEx( _In_ HANDLE hFile, _Out_opt_ LPVOID lpBuffer, _In_ DWORD nNumberOfBytesToRead, _Inout_ LPOVERLAPPED lpOverlapped, _In_ LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
|
1 2 3 4 5 6 7
| BOOL WINAPI WriteFileEx( _In_ HANDLE hFile, _In_opt_ LPCVOID lpBuffer, _In_ DWORD nNumberOfBytesToWrite, _Inout_ LPOVERLAPPED lpOverlapped, _In_ LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
|
这两个函数增加了一个 OVERLAPPED_COMPLETION_ROUTINE
完成函数指针,当I/O任务结束时操作系统会调用
我们指定的完成函数,这种方式称为 异步过程调用(Asynchronous Procedure Call, APC)
。函数中的参数
lpOverlapped
的成员 hEvent
没有被使用,所以我们可以用来传递一些自定义的信息。
1 2 3 4 5
| VOID CALLBACK FileIOCompletionRoutine( _In_ DWORD dwErrorCode, _In_ DWORD dwNumberOfBytesTransfered, _Inout_ LPOVERLAPPED lpOverlapped );
|
注意:Windows 不会贸然中断某个程序,然后调用提供的 Callback
函数,线程必须在所谓的 alertable
状态之下才行。如果有一个I/O操作完成,而线程不处于 alertable
状态,那么对 I/O完成函数
的调用就会
暂时被保留下来。因此,当一个线程终于进入 alertable
状态时,可能已经有大量的 APCs
等待被处理。
我们可以使用如下几个函数,使线程进入 alertable
状态:
1 2 3 4 5 6
| SleepEx(); SignalObjectAndWait(); MsgWaitForMultipleObjectsEx(); WaitForMultipleObjectsEx(); WaitForSingleObjectEx(); GetOverlappedResultEx();
|
如下为示例代码,先发起一些操作请求,然后再使线程进入 alertable
状态来处理所有 APCs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| typedef struct _TASKINST { OVERLAPPED stOverlap; HANDLE hFile; DWORD dwSize; CHAR szBuf[64]; } TASKINST, *PTASKINST;
DWORD g_dwTaskNum = 0; DWORD g_dwTaskFinish = 0;
VOID CALLBACK CompletionRoutine( DWORD dwErrorCode, DWORD dwTransfered, LPOVERLAPPED lpOverlapped) { g_dwTaskFinish++; PTASKINST pTask = (PTASKINST)lpOverlapped; if (pTask != NULL) { CloseHandle(pTask->hFile); free(pTask); } }
BOOL APCsTest() { BOOL bRet = 0; DWORD dwErr = 0; PTASKINST pTask = NULL; for (int i = 0; i < 5; i++) { pTask = (PTASKINST)malloc(sizeof(TASKINST)); if (pTask == NULL) continue; memset(pTask, 0, sizeof(TASKINST)); sprintf_s(pTask->szBuf, 64, "abc%d.txt", i); pTask->hFile = CreateFileA(pTask->szBuf, GENERIC_WRITE, FILE_SHARE_WRITE, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL); if (pTask->hFile == INVALID_HANDLE_VALUE) { free(pTask); continue; } pTask->stOverlap.hEvent = (HANDLE)i; g_dwTaskNum++; pTask->dwSize = strlen(pTask->szBuf); bRet = WriteFileEx(pTask->hFile, pTask->szBuf, pTask->dwSize, &pTask->stOverlap, CompletionRoutine); if (!bRet) { dwErr = GetLastError(); if (dwErr != ERROR_IO_PENDING) { g_dwTaskNum--; CloseHandle(pTask->hFile); free(pTask); } } } DWORD dwRet = 0; while (1) { dwRet = SleepEx(INFINITE, TRUE); if (dwRet != WAIT_IO_COMPLETION) { return FALSE; } if (g_dwTaskFinish >= g_dwTaskNum) break; } return TRUE; }
|
进过测试发现,只有把调用 ReadFileEx
的线程设置为 alertable
状态,才会处理 APC
任务,
而单独创建新线程设置 alertable
状态,不会处理 APC
任务,细节有待进一步的研究。