当前位置:网站首页>任务流执行器是如何工作的?
任务流执行器是如何工作的?
2022-08-09 21:51:00 【hebiwen95】
最近在整一个 OpenAPI 编排器,想到 npm-run-all 的任务流。看了一下这个 6 年前的源码。npm-run-all[1] 是一个用来并行或者串行运行多个 npm 脚本的 CLI 工具。阅读完本文,你能收获到:
了解整个流程概览;
了解核心模块逻辑,入口分析、参数解析、任务流、任务执行等;
流程概览
直入主题,整个 npm-run-all 的整体执行流程如下:

当我们在终端敲入命令,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap 去分发不同命令不同参数的逻辑。help 和 version 比较简单,本文不做分析。任务控制方面,会先调用 npmRunAll 做参数解析,然后执行 runTasks 执行任务组中任务,全部任务执行后返回结果,结束整个流程。
入口分析
npm-run-all 包支持三条命令,我们看到源码根目录的 package.json 文件:
{
"name": "npm-run-all",
"version": "4.1.5",
"description": "A CLI tool to run multiple npm-scripts in parallel or sequential.",
"bin": {
"run-p": "bin/run-p/index.js",
"run-s": "bin/run-s/index.js",
"npm-run-all": "bin/npm-run-all/index.js"
},
"main": "lib/index.js",
"files": [
"bin",
"lib",
"docs"
],
"engines": {
"node": ">= 4"
}
}
bin 下面定义的命令脚本:
run-p,简化使用的脚本,代表并行执行脚本;
run-s,简化使用的脚本,代表串行执行脚本;
npm-run-all,复杂命令,通过 --serial 和 --parallel 参数实现前两者一样的效果。
直接看到 bin/npm-run-all/index.js:
require("../common/bootstrap")("npm-run-all")
上述代码中,如果是执行 run-p 这条命令,则函数传入的参数是 run-p,run-s 同理。bootstrap 通过参数的不同,将任务分发到 bin 下不同目录中:
.
├── common
│ ├── bootstrap.js
│ ├── parse-cli-args.js
│ └── version.js
├── npm-run-all
│ ├── help.js
│ ├── index.js
│ └── main.js
├── run-p
│ ├── help.js
│ ├── index.js
│ └── main.js
└── run-s
├── help.js
├── index.js
└── main.js
照着上述代码结构,结合 ../common/bootstrap 代码:
"use strict"
module.exports = function bootstrap(name) {
const argv = process.argv.slice(2)
switch (argv[0]) {
case undefined:
case "-h":
case "--help":
return require(`../${name}/help`)(process.stdout)
case "-v":
case "--version":
return require("./version")(process.stdout)
default:
// https://github.com/mysticatea/npm-run-all/issues/105
// Avoid MaxListenersExceededWarnings.
process.stdout.setMaxListeners(0)
process.stderr.setMaxListeners(0)
process.stdin.setMaxListeners(0)
// Main
return require(`../${name}/main`)(
argv,
process.stdout,
process.stderr
).then(
() => {
// I'm not sure why, but maybe the process never exits
// on Git Bash (MINGW64)
process.exit(0)
},
() => {
process.exit(1)
}
)
}
}
bootstrap 函数依据调用的命令调用不同目录下的 help、version 或者调用 main 函数,达到了差异消除的效果。
然后再把目光放在 process.stdout.setMaxListeners(0) 这是啥玩意?打开 issue 链接[2],通过报错信息和翻阅官方文档:
By default
EventEmitters will print a warning if more than10listeners are added for a particular event. This is a useful default that helps finding memory leaks. Theemitter.setMaxListeners()method allows the limit to be modified for this specificEventEmitterinstance. The value can be set toInfinity(or0) to indicate an unlimited number of listeners.默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告。这是一个有用的默认值,有助于发现内存泄漏。emitter.setMaxListeners() 方法允许为这个特定的 EventEmitter 实例修改限制。该值可以设置为 Infinity(或 0)以指示无限数量的侦听器。
为什么要处理这个情况呢?因为用户可能这么使用:
$ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11
你永远想象不到用户会怎么使用你的工具!
参数解析
分析完不同命令的控制逻辑,我们进入核心的 npmRunAll 函数,参数解析部分逻辑如下:
module.exports = function npmRunAll(args, stdout, stderr) {
try {
const stdin = process.stdin
const argv = parseCLIArgs(args)
} catch () {
// ...
}
}

解析处理所有标准输入流参数,最终生成并返回 ArgumentSet 实例 set。parseCLIArgsCore 只看控制任务流执行的参数:
function addGroup(groups, initialValues) {
groups.push(Object.assign(
{ parallel: false, patterns: [] },
initialValues || {}
))
}
function parseCLIArgsCore(set, args) {
LOOP:
for (let i = 0; i < args.length; ++i) {
const arg = args[i]
switch (arg) {
// ...
case "-s":
case "--sequential":
case "--serial":
if (set.singleMode && arg === "-s") {
set.silent = true
break
}
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
}
addGroup(set.groups)
break
case "-p":
case "--parallel":
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
}
addGroup(set.groups, { parallel: true })
break
default: {
// ...
break
}
}
}
// ...
return set
}
将任务都装到 groups 数组中,如果是并行任务(传了 -p、--parallel 参数),就给任务加上 { parallel: true } 标记。默认是 { parallel: false },即串行任务。
执行任务组
在进入这一小节之前,我们就 npm-run-all 源码在 scripts 下加一条 debug 命令:
$ "node ./bin/npm-run-all/index.js lint test"
解析完参数生成的 argv.groups 如下:
[{
paralles: false,
patterns: ['lint', 'test']
}]
有了这个数组结果,我们再看任务执行流程会更加明朗。
// bin/npm-run-all/main.js
module.exports = function npmRunAll(args, stdout, stderr) {
try {
// 省略解析参数
// 执行任务
const promise = argv.groups.reduce(
(prev, group) => {
// 分组中没有任务,直接返回 null
if (group.patterns.length === 0) {
return prev
}
return prev.then(() => runAll(
group.patterns, // ['lint', 'test']
{
// ……
// 是否并行执行
parallel: group.parallel,
// 并行的最大数量
maxParallel: group.parallel ? argv.maxParallel : 1,
// 一个任务失败后继续执行其他任务
// ……
arguments: argv.rest,
// 这个用于当任务以0码退出时,终止全部任务
race: group.parallel && argv.race,
//……
}
))
},
Promise.resolve(null)
)
// ...
return promise
}
catch (err) {
//eslint-disable-next-line no-console
console.error("ERROR:", err.message)
return Promise.reject(err)
}
}
上述代码解析完命令行中的参数之后,通过 reduce 拼接所有任务组的结果。任务组就是 npm-run-all 支持同时配置并行和串行的任务,并生成多个任务组。例如:
$ npm-run-all -p a b -s c d e
上述命令会生成两个任务组,并行任务组是 ['a', 'b'],串行任务组是 ['c', 'd', 'e']。就会执行两次 runAll。接下来就看看单个任务组的执行逻辑:
// lib/index.js
module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
// 省略一系列参数格式化和默认值处理……
try {
const patterns = parsePatterns(patternOrPatterns, args)
// 省略非法参数报错和参数之间的校验……
return Promise.resolve()
.then(() => {
if (taskList != null) {
return { taskList, packageInfo: null }
}
return readPackageJson()
})
.then(x => {
// 从 package.json 中匹配任务
// 中 patterns 是 ['lint', 'tests'],所以 lint 和 test 这两个任务一定要从 package.json 的 scripts 中能查看到
const tasks = matchTasks(x.taskList, patterns)
return runTasks(tasks, {
// 省略上面格式化和校验后的参数……
})
})
}
catch (err) {
return Promise.reject(new Error(err.message))
}
}
上述代码将参数的处理和校验全部省略掉了,看到核心的逻辑 matchTasks 和 runTasks。matchTasks 通过读取 package.json 下 scripts 中的命令,然后判断任务组 patterns 中的任务是否都存在于 taskList 中。
然后就来到了本小节的核心逻辑——调用 runTasks 依次执行每个任务组中的任务:
module.exports = function runTasks(tasks, options) {
return new Promise((resolve, reject) => {
// 任务组中不存在任务,直接返回空数组
if (tasks.length === 0) {
resolve([])
return
}
// 结果数组
const results = tasks.map(task => ({ name: task, code: undefined }))
// 任务队列
const queue = tasks.map((task, index) => ({ name: task, index }))
// 用于判断并行的时候任务是否全部完成
const promises = []
let error = null
let aborted = false
/**
* Done.
* @returns {void}
*/
function done() {
// ……
}
/**
* Aborts all tasks.
* @returns {void}
*/
function abort() {
// ……
}
/**
* Runs a next task.
* @returns {void}
*/
function next() {
// 任务被终止了,则不需要再往下执行了
if (aborted) {
return
}
// 并行时,只有满足 queue 和 promises 的长度都为 0,才可以判断任务组完成
if (queue.length === 0) {
if (promises.length === 0) {
done()
}
return
}
const task = queue.shift()
const promise = runTask(task.name, optionsClone)
promises.push(promise)
promise.then(
(result) => {
// 完成一个任务,就将其从 promises 中删除
remove(promises, promise)
if (aborted) {
return
}
if (result.code) {
error = new NpmRunAllError(result, results)
// 失败后不继续执行后续的任务,则直接终止整个任务组
if (!options.continueOnError) {
abort()
return
}
}
// Aborts all tasks if options.race is true.
if (options.race && !result.code) {
abort()
return
}
// Call the next task.
next()
},
(thisError) => {
remove(promises, promise)
if (!options.continueOnError || options.race) {
error = thisError
abort()
return
}
next()
}
)
}
// 最大并发数
const max = options.maxParallel
// 对比任务数量、配置的并发数、取较小者。这是为了防止配置的 maxParallel 比实际执行的任务数量还大的情况
const end = (typeof max === "number" && max > 0)
? Math.min(tasks.length, max)
: tasks.length
for (let i = 0; i < end; ++i) {
next()
}
})
}
通过队列,依次执行组中的每一条任务,任务成功后将结果存入 result,然后调用 next 执行下一个任务;可以通过 abort 终止全部任务;通过 done 完成整个队列的状态更新,并将结果返回。
串行机制
接下来,通过一张图和本小节的示例,来更好地理解串行机制:

从左到右得流程,讲解一下:
初始化时,根据任务组的 patterns,生成任务队列;
计算完任务数量 end 之后,执行 next 函数。此时会从任务队列中取出 lint 任务,调用 runTask 去执行该任务(图2所示)。(runTask 的细节放到下一小节分析。)执行完成后,会执行以下子任务:
如果配置了
aggregateOutput参数,会将任务的输出流写入到内存流;更新 result.code,如果配置了失败不继续执行(!continueOnError) 或者 race 参数,就直接调用 abort 终止整个任务队列,返回结果;
如果成功或配置失败了继续执行其他任务( continueOnError),就去任务队列中取出下一个任务(图3所示),会去执行 test 任务,重复上一步骤的逻辑。最终任务队列中没有其他任务了,此时也会执行 done 函数,结束整个任务组,并将 results 返回。
并行机制
并行机制的不同就在于初始的时候会调用多次 next 函数,并且会判断当前是否还有正在执行的任务。

上图是我们执行以下命令的流程图:
$ node ./bin/npm-run-all/index.js -p lint test --max-parallel 2
命令的意思是并行执行 lint 和 test 任务,并且最大并发数是 2。
回到上面的流程图:
初始时还是会创建一个任务队列,并将 lint 和 test 两个任务添加到队列中;
然后在首次执行时,因为我们是并发执行,所以会调用两次 next 函数,promises 数组会保存两个 promise 实例;
当 lint 任务先完成(此时 test 任务还在执行,即 test promise 还未结束),此时会再调用 next 函数。此时会判断任务队列和正在进行的任务队列是否为空,如果是的话就调用 done 返回结果,否则什么都不做,等待其他任务执行完成。
当 test 任务也完成时(假设此时 lint 任务已经完成),同样也会再次执行 next。但此时 queue 和 promises 两个数组的长度都是 0,就执行 done 逻辑,输出任务组的结果。
小结
本节我们学习了任务组中的任务不管是串行机制还是并行机制,都通过任务队列依次执行。不同的是,串行是首次只执行一次 next,并行根据参数执行多次 next。当满足队列为空并且所有任务都完成,就结束当前任务组,并将缓存在 results 中的结果返回。
单个任务如何执行
了解完任务组的串行和并行机制,这一小节就来了解单个任务是如何被执行的。
module.exports = function runTask(task, options) {
let cp = null
const promise = new Promise((resolve, reject) => {
// 包装输出、输入、错误信息流……
// 在输出流中写入任务名称
if (options.printName && stdout != null) {
stdout.write(createHeader(
task,
options.packageInfo,
options.stdout.isTTY
))
}
// 执行命令的 npm 路径,npm-cli 的路径
const npmPath = options.npmPath || process.env.npm_execpath
const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
// 执行路径,一般是全局的 bin/node 路径
const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
// 判断是不是 yarn
const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
const spawnArgs = ["run"]
if (npmPathIsJs) {
spawnArgs.unshift(npmPath)
}
if (!isYarn) {
Array.prototype.push.apply(spawnArgs, options.prefixOptions)
}
else if (options.prefixOptions.indexOf("--silent") !== -1) {
spawnArgs.push("--silent")
}
Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))
// 执行命令
cp = spawn(execPath, spawnArgs, spawnOptions)
// 省略输出流格式化……
// Register
cp.on("error", (err) => {
cp = null
reject(err)
})
cp.on("close", (code, signal) => {
cp = null
// 成功后返回任务名称、状态
resolve({ task, code, signal })
})
})
// 给当前的promise挂上静态方法,用于结束当前子进程任务
promise.abort = function abort() {
if (cp != null) {
cp.kill()
cp = null
}
}
return promise
}
runTask 做了四件事:
格式化标准输入、输出流,添加一些任务名称头部信息之类的;
获取任务的执行器,获取 npm-cli、node 等路径信息,然后拼接整个任务的执行命令;
调用封装后的 spawn 执行命令,并监听 error 和 close 事件用于返回执行结果;因为系统的不一致,所以 spawn 通过 cross-spawn 做了一层封装。
给当前任务挂上了 abort 的静态方法,用于结束当前进程;当在任务组执行 abort 方法时,实际会调用这个静态方法。
总结
有人会问为什么要去看一个 6 年前写的源码?语法老旧、甚至还能看到标记声明[3] 。但我想表达的是,npm-run-all 这个包的核心逻辑——通过任务队列去实现串行和并行的任务流模型是非常经典的,类似 continueOnError、race 这样的逻辑控制节点也随处可见。例如当前非常先进的构建系统 Turborepo[4] ,它对 pipeline 的控制逻辑也能复用这个任务流模型。
边栏推荐
- 6 rules to sanitize your code
- TF uses constant to generate data
- NIO Cup 2022 Nioke Summer Multi-School Training Camp 7 CFGJ
- 编译原理之文法
- 编程时请选择正确的输入法,严格区分中英文
- Tensorflow中使用convert_to_tensor去指定数据的类型
- PHP 二维数组根据某个字段排序
- POWER SOURCE ETA埃塔电源维修FHG24SX-U概述
- SecureCRT background color
- STC8H Development (15): GPIO Drives Ci24R1 Wireless Module
猜你喜欢

SQLi-LABS Page-2 (Adv Injections)
2.1.5 大纲显示问题

Don't tell me to play, I'm taking the PMP exam: what you need to know about choosing an institution for the PMP exam

hdu 1503 Advanced Fruits(最长公共子序列的应用)

APP自动化测试框架-UiAutomator2基础入门
![[Implementation of the interface for adding, deleting, checking, and modifying a double-linked list]](/img/49/ebedcd4d27aa608360ac17e504f36d.png)
[Implementation of the interface for adding, deleting, checking, and modifying a double-linked list]

Word怎么制作一张标准的答题卡?
![[corctf 2022] section](/img/03/ee1ead55805a2ac690ec79c675c3e6.png)
[corctf 2022] section

AI Knows Everything: Building and Deploying a Sign Language Recognition System from Zero

线段相交的应用
随机推荐
Sudoku | Backtrack-7
JS–比想象中简单
CVPR22 Oral|通过多尺度token聚合分流自注意力,代码已开源
哪款C语言编译器(IDE)适合初学者?
Word箭头上面怎么打字
“稚晖君”为2022昇腾AI创新大赛打call&nbsp;期待广大开发者加入
Converting angles to radians
Deceptive Dice(期望计算)
BulkInsert方法实现批量导入
从产品角度看 L2 应用:为什么说这是一个游乐场?
np中的round函数,ceil函数与floor函数
几种绘制时间线图的方法
同步锁synchronized追本溯源
Daily practice of PMP | Do not get lost in the exam -8.8 (including agility + multiple choice)
CVPR22 Oral | shunt through multi-scale token polymerization from attention, code is open source
Usage of placeholder function in Tensorflow
[Generic Programming] Full Detailed Explanation of Templates
STC8H开发(十五): GPIO驱动Ci24R1无线模块
6 rules to sanitize your code
论文解读(DropEdge)《DropEdge: Towards Deep Graph Convolutional Networks on Node Classification》