当前位置:网站首页>nodejs worker_threads的事件监听问题

nodejs worker_threads的事件监听问题

2022-08-11 08:48:00 jfqqqqq

注册事件的时间有效性

在new Worker之后,worker就已经立即执行了,这样一来,会出现一个问题,就是在new之后,我们注册'messge'事件时,可能worker就已经执行完了,这样一来我们注册的'message'事件就不会被触发。

验证实例 1

比如下代码:

const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");

exports.multiInsert = function (target) {
    if (isMainThread) {
        const max = 12
        const min = 1
        let primes = []

        const threadCount = +process.argv[2] || 2
        const threads = new Set()
        console.log(`Running with ${threadCount} threads...`)
        const range = Math.ceil((max - min) / threadCount)
        let start = min

        for (let i = 0; i < threadCount - 1; i++) {
            const myStart = start
            threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
            start += range
        }

        threads.add(new Worker(__filename, {
            workerData: {
                start,
                range: range + ((max - min + 1) % threadCount)
            }
        }))

        setTimeout(function () {
            for (const worker of threads) {
                const workerTmp = worker;//as Worker
                workerTmp.on('error', (err) => {
                    throw err
                });
                workerTmp.on('exit', () => {
                    threads.delete(worker)
                    console.log(`Thread exiting, ${threads.size} running...`)
                    if (threads.size === 0) {
                        // console.log(primes.join('\n'))
                    }
                })

                workerTmp.on('message', (msg) => {
                    // console.log(" workerTmp.on('message'")
                    console.log('onMessage=' + msg);
                    primes = primes.concat(msg)
                })
            }
        }, 3000);


    } else {
        target(workerData.start, workerData.range)
    }
}

let insert2DB = function (start, range) {
   
        console.log("start=" + start + ",range" + range);
        let arr = [];
        for (let i = 0; i < range; i++) {
            arr[arr.length] = start + i;
        }
        console.log("insert2DB=" + arr.toString());
        parentPort.postMessage(arr)
   
}
this.multiInsert(insert2DB);

上述代码中,有两个函数:

multiInsert()一个负责创建线程,并在子线程环境下执行insert2DB()函数。

而insert2DB()函数,就是具体的子线程的执行逻辑(执行任务)。

其中,注意的是:

1. insert2DB()函数的内容是:得出从start值开始,一次加1,放入数组,加够range次后,返回数组。

2. multiInsert中,在创建worker后,注册message事件的代码是被setTimeout包裹的,延时执行3秒。也就是说,创建完worker后,过三秒才注册监听事件。

这时候,运行代码,得到的结果就是:

只有这5行输出。 

而在message和exit事件的回调函数中的console.log并没有被触发。

为了更能说明问题,再增加一项实验,见如下代码:

验证实例 2

const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");

exports.multiInsert = function (target) {
    if (isMainThread) {
        const max = 12
        const min = 1
        let primes = []

        const threadCount = +process.argv[2] || 2
        const threads = new Set()
        console.log(`Running with ${threadCount} threads...`)
        const range = Math.ceil((max - min) / threadCount)
        let start = min

        for (let i = 0; i < threadCount - 1; i++) {
            const myStart = start
            threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
            start += range
        }

        threads.add(new Worker(__filename, {
            workerData: {
                start,
                range: range + ((max - min + 1) % threadCount)
            }
        }))

        setTimeout(function () {
            for (const worker of threads) {
                const workerTmp = worker;//as Worker
                workerTmp.on('error', (err) => {
                    throw err
                });
                workerTmp.on('exit', () => {
                    threads.delete(worker)
                    console.log(`Thread exiting, ${threads.size} running...`)
                    if (threads.size === 0) {
                        // console.log(primes.join('\n'))
                    }
                })

                workerTmp.on('message', (msg) => {
                    // console.log(" workerTmp.on('message'")
                    console.log('onMessage=' + msg);
                    primes = primes.concat(msg)
                })
            }
        }, 3000);


    } else {
        target(workerData.start, workerData.range)
    }
}

let insert2DB = function (start, range) {
    setTimeout(function () {
        console.log("start=" + start + ",range" + range);
        let arr = [];
        for (let i = 0; i < range; i++) {
            arr[arr.length] = start + i;
        }
        console.log("insert2DB=" + arr.toString());
        parentPort.postMessage(arr)
    }, 5000);
}
this.multiInsert(insert2DB);

说明:与实例1的代码不同的是,修改了insert2DB函数,把里面的内容也交给了一个setTimeout,设置延时5秒,比注册逻辑部分晚发生2秒。

再次执行,打印出运行结果

打印效果是,先输出‘Running with 2 threads...’,然后延时一会儿,输出‘start=1,range6
start=7,range6
onMessage=1,2,3,4,5,6
onMessage=7,8,9,10,11,12
insert2DB=1,2,3,4,5,6
insert2DB=7,8,9,10,11,12
Thread exiting, 1 running...
Thread exiting, 0 running...

’)

可以看到,message和exiting事件被触发了。

以上是自己实验的结果,如有错误,请大家指正!

原网站

版权声明
本文为[jfqqqqq]所创,转载请带上原文链接,感谢
https://blog.csdn.net/jfqqqqq/article/details/126221010