当前位置:网站首页>nodejs worker_threads的事件监听问题
nodejs worker_threads的事件监听问题
2022-08-11 08:48:00 【jfqqqqq】
注册事件的时间有效性
在new Worker之后,worker就已经立即执行了,这样一来,会出现一个问题,就是在new之后,我们注册'messge'事件时,可能worker就已经执行完了,这样一来我们注册的'message'事件就不会被触发。
验证实例 1
比如下代码:
const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
exports.multiInsert = function (target) {
if (isMainThread) {
const max = 12
const min = 1
let primes = []
const threadCount = +process.argv[2] || 2
const threads = new Set()
console.log(`Running with ${threadCount} threads...`)
const range = Math.ceil((max - min) / threadCount)
let start = min
for (let i = 0; i < threadCount - 1; i++) {
const myStart = start
threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
start += range
}
threads.add(new Worker(__filename, {
workerData: {
start,
range: range + ((max - min + 1) % threadCount)
}
}))
setTimeout(function () {
for (const worker of threads) {
const workerTmp = worker;//as Worker
workerTmp.on('error', (err) => {
throw err
});
workerTmp.on('exit', () => {
threads.delete(worker)
console.log(`Thread exiting, ${threads.size} running...`)
if (threads.size === 0) {
// console.log(primes.join('\n'))
}
})
workerTmp.on('message', (msg) => {
// console.log(" workerTmp.on('message'")
console.log('onMessage=' + msg);
primes = primes.concat(msg)
})
}
}, 3000);
} else {
target(workerData.start, workerData.range)
}
}
let insert2DB = function (start, range) {
console.log("start=" + start + ",range" + range);
let arr = [];
for (let i = 0; i < range; i++) {
arr[arr.length] = start + i;
}
console.log("insert2DB=" + arr.toString());
parentPort.postMessage(arr)
}
this.multiInsert(insert2DB);上述代码中,有两个函数:
multiInsert()一个负责创建线程,并在子线程环境下执行insert2DB()函数。
而insert2DB()函数,就是具体的子线程的执行逻辑(执行任务)。
其中,注意的是:
1. insert2DB()函数的内容是:得出从start值开始,一次加1,放入数组,加够range次后,返回数组。
2. multiInsert中,在创建worker后,注册message事件的代码是被setTimeout包裹的,延时执行3秒。也就是说,创建完worker后,过三秒才注册监听事件。
这时候,运行代码,得到的结果就是:

只有这5行输出。
而在message和exit事件的回调函数中的console.log并没有被触发。
为了更能说明问题,再增加一项实验,见如下代码:
验证实例 2
const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
exports.multiInsert = function (target) {
if (isMainThread) {
const max = 12
const min = 1
let primes = []
const threadCount = +process.argv[2] || 2
const threads = new Set()
console.log(`Running with ${threadCount} threads...`)
const range = Math.ceil((max - min) / threadCount)
let start = min
for (let i = 0; i < threadCount - 1; i++) {
const myStart = start
threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
start += range
}
threads.add(new Worker(__filename, {
workerData: {
start,
range: range + ((max - min + 1) % threadCount)
}
}))
setTimeout(function () {
for (const worker of threads) {
const workerTmp = worker;//as Worker
workerTmp.on('error', (err) => {
throw err
});
workerTmp.on('exit', () => {
threads.delete(worker)
console.log(`Thread exiting, ${threads.size} running...`)
if (threads.size === 0) {
// console.log(primes.join('\n'))
}
})
workerTmp.on('message', (msg) => {
// console.log(" workerTmp.on('message'")
console.log('onMessage=' + msg);
primes = primes.concat(msg)
})
}
}, 3000);
} else {
target(workerData.start, workerData.range)
}
}
let insert2DB = function (start, range) {
setTimeout(function () {
console.log("start=" + start + ",range" + range);
let arr = [];
for (let i = 0; i < range; i++) {
arr[arr.length] = start + i;
}
console.log("insert2DB=" + arr.toString());
parentPort.postMessage(arr)
}, 5000);
}
this.multiInsert(insert2DB);说明:与实例1的代码不同的是,修改了insert2DB函数,把里面的内容也交给了一个setTimeout,设置延时5秒,比注册逻辑部分晚发生2秒。
再次执行,打印出运行结果

(打印效果是,先输出‘Running with 2 threads...’,然后延时一会儿,输出‘start=1,range6
start=7,range6
onMessage=1,2,3,4,5,6
onMessage=7,8,9,10,11,12
insert2DB=1,2,3,4,5,6
insert2DB=7,8,9,10,11,12
Thread exiting, 1 running...
Thread exiting, 0 running...
’)
可以看到,message和exiting事件被触发了。
以上是自己实验的结果,如有错误,请大家指正!
边栏推荐
猜你喜欢

Continuous Integration/Continuous Deployment (2) Jenkins & SonarQube

Jupyter Notebook 插件 contrib nbextension 安装使用

无代码平台助力中山医院搭建“智慧化管理体系”,实现智慧医疗

Creo9.0 特征的成组

如何通过开源数据库管理工具 DBeaver 连接 TDengine

Initial use of IDEA

笔试题大疆08.07

C Primer Plus(6) 中文版 第1章 初识C语言 1.7 使用C语言的7个步骤

Nuget can't find the package problem

SDUT 2877: angry_birds_again_and_again
随机推荐
中国电子学会五级考点详解(一)-string类型字符串
flex布局回顾
jenkins 流水线脚本详细解析Pipeline
MySql的索引
高德能力API
Linux,Redis中IOException: 远程主机强迫关闭了一个现有的连接。解决方法
js将table生成excel文件并去除表格中的多余tr(js去除表格中空的tr标签)
基于consul的注册发现的微服务架构迁移到servicemesh
mysql添加用户以及设置权限
WiFi cfg80211
[UEFI]EFI_DEVICE_PATH_PROTOCOL 结构体初始化的一个例子
万字长文带你了解多态的底层原理,这一篇就够了
小程序组件不能修改ui组件样式
盘点四个入门级SSL证书
Go 语言的诞生
2022-08-10:为了给刷题的同学一些奖励,力扣团队引入了一个弹簧游戏机, 游戏机由 N 个特殊弹簧排成一排,编号为 0 到 N-1, 初始有一个小球在编号 0 的弹簧处。若小球在编号为 i 的弹
分门别类输入输出,Go lang1.18入门精炼教程,由白丁入鸿儒,go lang基本数据类型和输入输出EP03
第一次因没有找到iframe元素而怀疑selenium4是不是有bug?
如何通过开源数据库管理工具 DBeaver 连接 TDengine
jenkins简单使用