之前发过帖子
里面提到一点,emacs 在 windows 上使用了 WaitForMultipleObjects/MsgWaitForMultipleObjects 来模拟实现 sys_select ,而 WaitForMultipleObjects/MsgWaitForMultipleObjects 监听的句柄不能超过64个,导致打开的子进程/套接字不能超过32个。
大家自己也可以试一下,用如下的代码连续打开100个子进程,会报错,而且能打开的进程应该不超过30个。
(defun create-ping-process (index)
"Create a ping process for a given INDEX."
(let ((process-name (format "ping-process-%d" index))
(buffer-name (format "*ping-output-%d*" index))
(host "127.0.0.1"))
(start-process process-name
buffer-name
"ping" "-c" "4" host)))
(defun create-multiple-ping-processes (count)
"Create COUNT ping processes."
(dotimes (i count)
(create-ping-process i)))
(create-multiple-ping-processes 100)
这周末有空,尝试用多线程模拟实现 WaitForMultipleObjects/MsgWaitForMultipleObjects。 最终的效果如下图(如果失败会报错,或者buffer里面为空,我这里执行的ping命令在win上语法错误,这个不重要,点进去可以看到输出,进程已经成功创建并执行)
目前实现的效果是可以创建最多4096个子进程/套接字。 对于少量的进程创建,没有额外开销,只有创建超过63个子进程才有额外的线程创建开销。
欢迎大家测试,有问题可以反馈给我 目前是基于github仓库 commit 08e38818f6ff4e514ac291bc5a7686f4390759b0 做的修改
编译时指定了宏 -DFD_SETSIZE=4096
git diff如下
diff --git a/src/w32.h b/src/w32.h
index cf470ae9901..0340f10e62b 100644
--- a/src/w32.h
+++ b/src/w32.h
@@ -29,7 +29,7 @@ along with GNU Emacs. If not, see <https://www.gnu.org/licenses/>. */
/* File descriptor set emulation. */
/* MSVC runtime library has limit of 64 descriptors by default */
-#define FD_SETSIZE 64
+#define FD_SETSIZE 4096
typedef struct {
unsigned int bits[FD_SETSIZE / 32];
} fd_set;
diff --git a/src/w32proc.c b/src/w32proc.c
index 40181e09830..60918cb4a41 100644
--- a/src/w32proc.c
+++ b/src/w32proc.c
@@ -62,6 +62,215 @@ along with GNU Emacs. If not, see <https://www.gnu.org/licenses/>. */
#include "syssignal.h"
#include "w32term.h"
#include "coding.h"
+#include <windows.h>
+
+#define MY_MAXIMUM_WAIT_OBJECTS 4096
+// 最大等待对象数,额外的一个用于线程退出event
+#define MY_MAX_WAIT_OBJECTS 63
+#define MY_WAIT_TIMEOUT 0x9999
+#define MY_WAIT_FAILED 0x9998
+#define MY_WAIT_ABANDONED_0 0x5000
+
+
+// 线程参数结构
+typedef struct {
+ HANDLE *handles;
+ int count;
+ BOOL bWaitAll;
+ DWORD dwMilliseconds;
+ HANDLE completionEvent; // 用于通知主线程该线程的WaitForMultipleObjects已完成
+ DWORD *threadResult; // 获取线程调用WaitForMultipleObjects的返回值
+ HANDLE exitEvent; // 用于控制线程退出
+} WaitForThreadData;
+
+// 线程函数,等待一组句柄
+DWORD WINAPI WaitForThreadProc(LPVOID lpParam) {
+ WaitForThreadData *params = (WaitForThreadData *)lpParam;
+ HANDLE *allHandles = (HANDLE *)malloc((params->count + 1) * sizeof(HANDLE));
+ memcpy(allHandles, params->handles, params->count * sizeof(HANDLE));
+ allHandles[params->count] = params->exitEvent;
+
+ DWORD result = WaitForMultipleObjects(params->count + 1, allHandles, params->bWaitAll, params->dwMilliseconds);
+ if (result == WAIT_OBJECT_0 + params->count) {
+ // exitEvent 被触发,线程退出
+ result = WAIT_TIMEOUT;
+ } else {
+ *params->threadResult = result;
+ }
+
+ // 通知主线程该组已完成
+ SetEvent(params->completionEvent);
+ free(allHandles);
+ return result;
+}
+// 扩展的 WaitForMultipleObjects 函数 ,注意返回值和原版不一样
+DWORD WaitForMultipleObjectsCustom(DWORD nCount, CONST HANDLE *lpHandles, BOOL bWaitAll, DWORD dwMilliseconds) {
+ // emacs sys_select 中调用 WaitForMultipleObjects 时 bWaitAll 参数都为 FALSE
+ bWaitAll = FALSE;
+ if (nCount <= 64) {
+ // 句柄数未超过限制,直接调用 WaitForMultipleObjects
+ DWORD result = WaitForMultipleObjects(nCount, lpHandles, bWaitAll, dwMilliseconds);
+ if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + nCount) {
+ return result - WAIT_OBJECT_0;
+ } else if (WAIT_TIMEOUT == result) {
+ return MY_WAIT_TIMEOUT;
+ } else if (result >= WAIT_ABANDONED_0 && result < WAIT_ABANDONED_0 + nCount) {
+ return result - WAIT_ABANDONED_0 + MY_WAIT_ABANDONED_0;
+ } else {
+ return MY_WAIT_FAILED;
+ }
+ }
+
+ // 将句柄分组,每组最多 MY_MAX_WAIT_OBJECTS 个句柄
+ int numGroups = (nCount + MY_MAX_WAIT_OBJECTS - 1) / MY_MAX_WAIT_OBJECTS;
+ HANDLE *groupCompletionEvents = (HANDLE *)malloc(numGroups * sizeof(HANDLE));
+ WaitForThreadData *threadParams = (WaitForThreadData *)malloc(numGroups * sizeof(WaitForThreadData));
+ HANDLE *threads = (HANDLE *)malloc(numGroups * sizeof(HANDLE));
+ DWORD *threadResults = (DWORD *)malloc(numGroups * sizeof(DWORD));
+ HANDLE *exitEvents = (HANDLE *)malloc(numGroups * sizeof(HANDLE)); // 用于控制线程退出
+ DWORD startTime = GetTickCount();
+ DWORD elapsedTime = 0;
+
+ for (int i = 0; i < numGroups; ++i) {
+ int groupCount = (i == numGroups - 1) ? (nCount - i * MY_MAX_WAIT_OBJECTS) : MY_MAX_WAIT_OBJECTS;
+ threadParams[i].handles = (HANDLE *)(lpHandles + i * MY_MAX_WAIT_OBJECTS);
+ threadParams[i].count = groupCount;
+ threadParams[i].bWaitAll = bWaitAll;
+ threadParams[i].dwMilliseconds = dwMilliseconds;
+ threadParams[i].completionEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ threadParams[i].threadResult = &threadResults[i];
+ threadParams[i].exitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ groupCompletionEvents[i] = threadParams[i].completionEvent;
+ exitEvents[i] = threadParams[i].exitEvent;
+
+ threads[i] = CreateThread(NULL, 0, WaitForThreadProc, &threadParams[i], 0, NULL);
+ }
+
+ // 等待任一组完成
+ DWORD result;
+ if (dwMilliseconds != INFINITE) {
+ DWORD remainingTime = dwMilliseconds - elapsedTime;
+ result = WaitForMultipleObjects(numGroups, groupCompletionEvents, bWaitAll, remainingTime);
+ } else {
+ result = WaitForMultipleObjects(numGroups, groupCompletionEvents, bWaitAll, dwMilliseconds);
+ }
+
+ // 通知所有线程退出
+ for (int i = 0; i < numGroups; ++i) {
+ SetEvent(exitEvents[i]); // 新增
+ }
+
+ // 获取结果
+ if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + numGroups) {
+ DWORD inner_result = threadResults[result - WAIT_OBJECT_0];
+ if (inner_result >= WAIT_OBJECT_0 && inner_result < WAIT_OBJECT_0 + threadParams[result - WAIT_OBJECT_0].count) {
+ result = inner_result + (result - WAIT_OBJECT_0) * MY_MAX_WAIT_OBJECTS;
+ } else if (inner_result >= WAIT_ABANDONED_0 && inner_result < WAIT_ABANDONED_0 + MY_MAX_WAIT_OBJECTS) {
+ result = inner_result - WAIT_ABANDONED_0 + MY_WAIT_ABANDONED_0;
+ } else if (WAIT_TIMEOUT == inner_result) {
+ result = MY_WAIT_TIMEOUT;
+ } else {
+ result = MY_WAIT_FAILED;
+ }
+ } else if (WAIT_TIMEOUT == result) {
+ result = MY_WAIT_TIMEOUT;
+ } else if (result >= WAIT_ABANDONED_0 && result < WAIT_ABANDONED_0 + numGroups) {
+ result = result - WAIT_ABANDONED_0 + MY_WAIT_ABANDONED_0 ;
+ } else {
+ result = MY_WAIT_FAILED;
+ }
+
+ // 等待所有线程结束
+ WaitForMultipleObjects(numGroups, threads, TRUE, INFINITE);
+
+ // 关闭句柄和释放资源
+ for (int i = 0; i < numGroups; ++i) {
+ CloseHandle(threads[i]);
+ CloseHandle(groupCompletionEvents[i]);
+ CloseHandle(exitEvents[i]); // 新增
+ }
+
+ free(groupCompletionEvents);
+ free(threadParams);
+ free(threads);
+ free(threadResults);
+ free(exitEvents); // 新增
+
+ return result;
+}
+
+typedef struct {
+ DWORD nCount;
+ HANDLE* lpHandles;
+ BOOL bWaitAll;
+ DWORD dwMilliseconds;
+ HANDLE event;
+ HANDLE exitEvent;
+ DWORD threadResult;
+} MsgWaitThreadData;
+
+DWORD WINAPI MsgWaitThreadFunction(LPVOID param) {
+ MsgWaitThreadData* data = (MsgWaitThreadData*)param;
+ HANDLE *allHandles = (HANDLE *)malloc((data->nCount + 1) * sizeof(HANDLE));
+ memcpy(allHandles, data->lpHandles, data->nCount * sizeof(HANDLE));
+ allHandles[data->nCount] = data->exitEvent;
+ DWORD result = WaitForMultipleObjectsCustom(data->nCount+1, allHandles, data->bWaitAll, data->dwMilliseconds);
+ // printf("WaitForMultipleObjectsCustom result: %lu\n", result);
+ if (result == WAIT_OBJECT_0 + data->nCount) {
+ // exitEvent 被触发,线程退出
+ data->threadResult = MY_WAIT_FAILED;
+ } else {
+ data->threadResult = result;
+ }
+
+ // 通知主线程该组已完成
+ SetEvent(data->event);
+ free(allHandles);
+
+ return 0;
+}
+
+DWORD MsgWaitForMultipleObjectsCustom(DWORD nCount, HANDLE* lpHandles, BOOL bWaitAll, DWORD dwMilliseconds, DWORD dwWakeMask) {
+ // emacs sys_select 中调用 MsgWaitForMultipleObjects 时 bWaitAll 参数都为 FALSE
+ bWaitAll = FALSE;
+ HANDLE event = CreateEvent(NULL, FALSE, FALSE, NULL); // completeEvent
+ if (event == NULL) {
+ return MY_WAIT_FAILED;
+ }
+ HANDLE exitEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (event == NULL) {
+ return MY_WAIT_FAILED;
+ }
+
+ MsgWaitThreadData data = { nCount, lpHandles, bWaitAll, dwMilliseconds, event, exitEvent, MY_WAIT_FAILED };
+ HANDLE thread = CreateThread(NULL, 0, MsgWaitThreadFunction, &data, 0, NULL);
+ if (thread == NULL) {
+ CloseHandle(event);
+ return MY_WAIT_FAILED;
+ }
+
+ DWORD result = MsgWaitForMultipleObjects(1, &event, FALSE, dwMilliseconds, dwWakeMask);
+ SetEvent(exitEvent);
+ // printf("MsgWaitForMultipleObjects result: %lu\n", result);
+ if (result == WAIT_OBJECT_0) {
+ result = data.threadResult;
+ } else if (result == WAIT_OBJECT_0 + 1) {
+ // 消息队列中有消息
+ result = WAIT_OBJECT_0 + nCount; // 表示消息队列有消息
+ } else if (result == WAIT_TIMEOUT) {
+ result = MY_WAIT_TIMEOUT;
+ } else {
+ result = MY_WAIT_FAILED; // 直接返回结果
+ }
+
+ WaitForSingleObject(thread, INFINITE);
+
+ CloseHandle(exitEvent);
+ CloseHandle(event);
+ CloseHandle(thread);
+ return result;
+}
+
void w32_raise (int);
@@ -1566,15 +1775,15 @@ waitpid (pid_t pid, int *status, int options)
quitting in that case. */
if (!dont_wait)
maybe_quit ();
- active = WaitForMultipleObjects (nh, wait_hnd, FALSE, timeout_ms);
- } while (active == WAIT_TIMEOUT && !dont_wait);
+ active = WaitForMultipleObjectsCustom (nh, wait_hnd, FALSE, timeout_ms);
+ } while (active == MY_WAIT_TIMEOUT && !dont_wait);
- if (active == WAIT_FAILED)
+ if (active == MY_WAIT_FAILED)
{
errno = EBADF;
return -1;
}
- else if (active == WAIT_TIMEOUT && dont_wait)
+ else if (active == MY_WAIT_TIMEOUT && dont_wait)
{
/* PID specifies our subprocess, but it didn't exit yet, so its
status is not yet available. */
@@ -1584,14 +1793,14 @@ waitpid (pid_t pid, int *status, int options)
return 0;
}
else if (active >= WAIT_OBJECT_0
- && active < WAIT_OBJECT_0+MAXIMUM_WAIT_OBJECTS)
+ && active < WAIT_OBJECT_0+MY_MAXIMUM_WAIT_OBJECTS)
{
active -= WAIT_OBJECT_0;
}
- else if (active >= WAIT_ABANDONED_0
- && active < WAIT_ABANDONED_0+MAXIMUM_WAIT_OBJECTS)
+ else if (active >= MY_WAIT_ABANDONED_0
+ && active < MY_WAIT_ABANDONED_0+MY_MAXIMUM_WAIT_OBJECTS)
{
- active -= WAIT_ABANDONED_0;
+ active -= MY_WAIT_ABANDONED_0;
}
else
emacs_abort ();
@@ -2500,13 +2709,15 @@ sys_select (int nfds, SELECT_TYPE *rfds, SELECT_TYPE *wfds, SELECT_TYPE *efds,
/* Wait for input or child death to be signaled. If user input is
allowed, then also accept window messages. */
- if (FD_ISSET (0, &orfds))
- active = MsgWaitForMultipleObjects (nh + nc, wait_hnd, FALSE, timeout_ms,
- QS_ALLINPUT);
+ if (FD_ISSET (0, &orfds)){
+ active = MsgWaitForMultipleObjectsCustom (nh + nc, wait_hnd, FALSE, timeout_ms,
+ QS_ALLINPUT);
+ }
+
else
- active = WaitForMultipleObjects (nh + nc, wait_hnd, FALSE, timeout_ms);
+ active = WaitForMultipleObjectsCustom (nh + nc, wait_hnd, FALSE, timeout_ms);
- if (active == WAIT_FAILED)
+ if (active == MY_WAIT_FAILED)
{
DebPrint (("select.WaitForMultipleObjects (%d, %lu) failed with %lu\n",
nh + nc, timeout_ms, GetLastError ()));
@@ -2517,7 +2728,7 @@ sys_select (int nfds, SELECT_TYPE *rfds, SELECT_TYPE *wfds, SELECT_TYPE *efds,
errno = EINTR;
return -1;
}
- else if (active == WAIT_TIMEOUT)
+ else if (active == MY_WAIT_TIMEOUT)
{
if (noninteractive)
{
@@ -2527,14 +2738,14 @@ sys_select (int nfds, SELECT_TYPE *rfds, SELECT_TYPE *wfds, SELECT_TYPE *efds,
return 0;
}
else if (active >= WAIT_OBJECT_0
- && active < WAIT_OBJECT_0+MAXIMUM_WAIT_OBJECTS)
+ && active < WAIT_OBJECT_0+MY_MAXIMUM_WAIT_OBJECTS)
{
active -= WAIT_OBJECT_0;
}
- else if (active >= WAIT_ABANDONED_0
- && active < WAIT_ABANDONED_0+MAXIMUM_WAIT_OBJECTS)
+ else if (active >= MY_WAIT_ABANDONED_0
+ && active < MY_WAIT_ABANDONED_0+MY_MAXIMUM_WAIT_OBJECTS)
{
- active -= WAIT_ABANDONED_0;
+ active -= MY_WAIT_ABANDONED_0;
}
else
emacs_abort ();
如果大家自己想测试的话可以用这个老哥的脚本
下面是我编译好的版本
链接:https://pan.baidu.com/s/1riNj0T4awuYffKaGDVVCow?pwd=e3zt
提取码:e3zt