当前位置:网站首页>How do task flow executors work?
How do task flow executors work?
2022-08-09 23:44:00 【hebiwen95】
Recently in the a OpenAPI 编排器,想到 npm-run-all 的任务流.看了一下这个 6 Years ago, the source of.npm-run-all[1] Is a parallel or serial running multiple npm 脚本的 CLI 工具.阅读完本文,你能收获到:
To understand the whole process overview;
To understand the core module logic,入口分析、参数解析、任务流、任务执行等;
流程概览
直入主题,整个 npm-run-all 的整体执行流程如下:
When we are in the terminal type command,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap To distribute different command parameter logic.help 和 version 比较简单,本文不做分析.Mission control,会先调用 npmRunAll To do parameter parsing,然后执行 runTasks Perform a task in the group task,After all the task execution returns the result,结束整个流程.
入口分析
npm-run-all
Package support three command,We see the source root directory package.json 文件:
{
"name": "npm-run-all",
"version": "4.1.5",
"description": "A CLI tool to run multiple npm-scripts in parallel or sequential.",
"bin": {
"run-p": "bin/run-p/index.js",
"run-s": "bin/run-s/index.js",
"npm-run-all": "bin/npm-run-all/index.js"
},
"main": "lib/index.js",
"files": [
"bin",
"lib",
"docs"
],
"engines": {
"node": ">= 4"
}
}
bin The following definition command scripts:
run-p,Simplify the use of script,On behalf of the parallel execution script;
run-s,Simplify the use of script,On behalf of the serial execution script;
npm-run-all,复杂命令,通过 --serial 和 --parallel Parameters to achieve the same effect as the first two.
直接看到 bin/npm-run-all/index.js
:
require("../common/bootstrap")("npm-run-all")
上述代码中,如果是执行 run-p 这条命令,The function of the incoming parameter is run-p
,run-s
同理.bootstrap 通过参数的不同,将任务分发到 bin Under different directory:
.
├── common
│ ├── bootstrap.js
│ ├── parse-cli-args.js
│ └── version.js
├── npm-run-all
│ ├── help.js
│ ├── index.js
│ └── main.js
├── run-p
│ ├── help.js
│ ├── index.js
│ └── main.js
└── run-s
├── help.js
├── index.js
└── main.js
According to the above code structure,结合 ../common/bootstrap
代码:
"use strict"
module.exports = function bootstrap(name) {
const argv = process.argv.slice(2)
switch (argv[0]) {
case undefined:
case "-h":
case "--help":
return require(`../${name}/help`)(process.stdout)
case "-v":
case "--version":
return require("./version")(process.stdout)
default:
// https://github.com/mysticatea/npm-run-all/issues/105
// Avoid MaxListenersExceededWarnings.
process.stdout.setMaxListeners(0)
process.stderr.setMaxListeners(0)
process.stdin.setMaxListeners(0)
// Main
return require(`../${name}/main`)(
argv,
process.stdout,
process.stderr
).then(
() => {
// I'm not sure why, but maybe the process never exits
// on Git Bash (MINGW64)
process.exit(0)
},
() => {
process.exit(1)
}
)
}
}
bootstrap Function calls different directory according to the command of call help、version 或者调用 main 函数,To achieve the effect of the difference to eliminate.
然后再把目光放在 process.stdout.setMaxListeners(0)
这是啥玩意?打开 issue 链接[2],Through the error message and read the official document:
By default
EventEmitter
s will print a warning if more than10
listeners are added for a particular event. This is a useful default that helps finding memory leaks. Theemitter.setMaxListeners()
method allows the limit to be modified for this specificEventEmitter
instance. The value can be set toInfinity
(or0
) to indicate an unlimited number of listeners.默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告.这是一个有用的默认值,有助于发现内存泄漏.emitter.setMaxListeners() Method allows for the specific EventEmitter Instance modify restrictions.该值可以设置为 Infinity(或 0)To indicate an unlimited number of listeners.
Why do you want to deal with this situation?Because the user may use so:
$ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11
You can never imagine how users will use your tools!
参数解析
After analyzing different command control logic,We entered the core of npmRunAll 函数,Parameter parsing logic is as follows:
module.exports = function npmRunAll(args, stdout, stderr) {
try {
const stdin = process.stdin
const argv = parseCLIArgs(args)
} catch () {
// ...
}
}
Analytical process all the standard input stream parameters,The resulting and return ArgumentSet 实例 set.parseCLIArgsCore Look only at the control task flow execution parameters:
function addGroup(groups, initialValues) {
groups.push(Object.assign(
{ parallel: false, patterns: [] },
initialValues || {}
))
}
function parseCLIArgsCore(set, args) {
LOOP:
for (let i = 0; i < args.length; ++i) {
const arg = args[i]
switch (arg) {
// ...
case "-s":
case "--sequential":
case "--serial":
if (set.singleMode && arg === "-s") {
set.silent = true
break
}
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
}
addGroup(set.groups)
break
case "-p":
case "--parallel":
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
}
addGroup(set.groups, { parallel: true })
break
default: {
// ...
break
}
}
}
// ...
return set
}
The task is to groups
数组中,If it is a parallel tasks(传了 -p
、--parallel
参数),Give tasks with { parallel: true }
标记.默认是 { parallel: false }
,A serial task.
Perform a task group
Before getting into this section,我们就 npm-run-all 源码在 scripts Next add a debug 命令:
$ "node ./bin/npm-run-all/index.js lint test"
Parse the parameters generated argv.groups
如下:
[{
paralles: false,
patterns: ['lint', 'test']
}]
With this array results,We then look at the task execution process will be more clear.
// bin/npm-run-all/main.js
module.exports = function npmRunAll(args, stdout, stderr) {
try {
// Omit analytical parameters
// 执行任务
const promise = argv.groups.reduce(
(prev, group) => {
// No task in the group,直接返回 null
if (group.patterns.length === 0) {
return prev
}
return prev.then(() => runAll(
group.patterns, // ['lint', 'test']
{
// ……
// 是否并行执行
parallel: group.parallel,
// The maximum number of parallel
maxParallel: group.parallel ? argv.maxParallel : 1,
// A task after failing to continue to perform other tasks
// ……
arguments: argv.rest,
// This is used for when the task is to0When they exit,Termination of all tasks
race: group.parallel && argv.race,
//……
}
))
},
Promise.resolve(null)
)
// ...
return promise
}
catch (err) {
//eslint-disable-next-line no-console
console.error("ERROR:", err.message)
return Promise.reject(err)
}
}
After the above code parsing command line parameters of,通过 reduce Stitching all the result of the task group.Task group is npm-run-all Support the configuration of parallel and serial tasks at the same time,And generate multiple task group.例如:
$ npm-run-all -p a b -s c d e
The above command will generate two task group,The parallel task group is ['a', 'b'],Serial task group is ['c', 'd', 'e'].就会执行两次 runAll
.Then look at a single task group perform logical:
// lib/index.js
module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
// Omit the formatted a series of parameters and the default processing……
try {
const patterns = parsePatterns(patternOrPatterns, args)
// Omit the illegal parameter error and parameter calibration between……
return Promise.resolve()
.then(() => {
if (taskList != null) {
return { taskList, packageInfo: null }
}
return readPackageJson()
})
.then(x => {
// 从 package.json Matching task in
// 中 patterns 是 ['lint', 'tests'],所以 lint 和 test Both of these tasks must be from package.json 的 scripts Can see the
const tasks = matchTasks(x.taskList, patterns)
return runTasks(tasks, {
// Omit the above format and after checking the parameters of the……
})
})
}
catch (err) {
return Promise.reject(new Error(err.message))
}
}
The above code will be omitted parameters processing and check all,See the core logic matchTasks
和 runTasks
.matchTasks
通过读取 package.json
下 scripts
中的命令,Then judgment task group patterns
Whether the task is to taskList
中.
Then came to the core logic of this section——调用 runTasks
In turn perform each task in the group task:
module.exports = function runTasks(tasks, options) {
return new Promise((resolve, reject) => {
// Are not present in the task group task,直接返回空数组
if (tasks.length === 0) {
resolve([])
return
}
// 结果数组
const results = tasks.map(task => ({ name: task, code: undefined }))
// 任务队列
const queue = tasks.map((task, index) => ({ name: task, index }))
// When used to judge the parallel tasks to be completed
const promises = []
let error = null
let aborted = false
/**
* Done.
* @returns {void}
*/
function done() {
// ……
}
/**
* Aborts all tasks.
* @returns {void}
*/
function abort() {
// ……
}
/**
* Runs a next task.
* @returns {void}
*/
function next() {
// 任务被终止了,You don't need to go back to perform
if (aborted) {
return
}
// 并行时,只有满足 queue 和 promises 的长度都为 0,Only can be done judgment task group
if (queue.length === 0) {
if (promises.length === 0) {
done()
}
return
}
const task = queue.shift()
const promise = runTask(task.name, optionsClone)
promises.push(promise)
promise.then(
(result) => {
// 完成一个任务,就将其从 promises 中删除
remove(promises, promise)
if (aborted) {
return
}
if (result.code) {
error = new NpmRunAllError(result, results)
// Failure does not continue to carry out the tasks of follow-up after,Is directly to cancel the whole task group
if (!options.continueOnError) {
abort()
return
}
}
// Aborts all tasks if options.race is true.
if (options.race && !result.code) {
abort()
return
}
// Call the next task.
next()
},
(thisError) => {
remove(promises, promise)
if (!options.continueOnError || options.race) {
error = thisError
abort()
return
}
next()
}
)
}
// 最大并发数
const max = options.maxParallel
// Compare the task number、Configuration of the concurrency、取较小者.This is to prevent the configuration maxParallel Is greater than the actual number of task execution situation
const end = (typeof max === "number" && max > 0)
? Math.min(tasks.length, max)
: tasks.length
for (let i = 0; i < end; ++i) {
next()
}
})
}
通过队列,In turn perform each task in the group,After the success of the task will result in result,然后调用 next 执行下一个任务;可以通过 abort Termination of all tasks;通过 done Complete the entire queue status updates,并将结果返回.
Serial mechanism
接下来,Through a picture and this section of the sample,To better understand the serial mechanism:
From left to right to process,讲解一下:
初始化时,According to the task group patterns,生成任务队列;
Calculate the number of tasks end 之后,执行 next 函数.At this point from the task queue lint 任务,调用 runTask 去执行该任务(图2所示).(runTask In the next section the details of the analysis.)执行完成后,Performs the following tasks:
如果配置了
aggregateOutput
参数,Task will be written to the memory stream output flow;更新 result.code,If the configuration failure not to continue(!continueOnError) 或者 race 参数,就直接调用 abort Termination of the whole task queue,返回结果;
If success or configuration failed to perform other tasks( continueOnError),Go to task queue to retrieve the next task(图3所示),会去执行 test 任务,Repeat the step logic.There is no other task in the final task queue,此时也会执行 done 函数,The task group completed,并将 results 返回.
并行机制
Parallel mechanism of different is that the initial time would call many times next 函数,And will determine whether the current and executing tasks.
Above, we execute the following command flow chart:
$ node ./bin/npm-run-all/index.js -p lint test --max-parallel 2
Command means that parallel execution lint 和 test 任务,And maximum concurrency is 2.
Back to the flow chart above:
When the initial will create a task queue,并将 lint 和 test Two tasks are added to the queue;
And then in the execution time for the first time,Because we are concurrent execution,So will call twice next 函数,promises Array will hold two promise 实例;
当 lint The task to complete(此时 test 任务还在执行,即 test promise 还未结束),Will call again at this time next 函数.At this time will determine whether task queue and ongoing task queue is empty,如果是的话就调用 done 返回结果,否则什么都不做,Wait for the other tasks to complete.
当 test When the task is complete(假设此时 lint 任务已经完成),May also be performed again next.但此时 queue 和 promises Both the length of the array is 0,就执行 done 逻辑,Output the result of the task group.
小结
In this section, we study the task in the task group either serial or parallel mechanism,都通过任务队列依次执行.不同的是,Serial is executed only once for the first time next,Parallel execution according to the parameters many times next.When meet the queue is empty and all the tasks are completed,Ends the current task group,And will be cached in results 中的结果返回.
A single task how to perform
Understand the task group of serial and parallel mechanism,This section is to understand how the individual tasks are performed.
module.exports = function runTask(task, options) {
let cp = null
const promise = new Promise((resolve, reject) => {
// Packing output、输入、Wrong information……
// Task name written into the output stream
if (options.printName && stdout != null) {
stdout.write(createHeader(
task,
options.packageInfo,
options.stdout.isTTY
))
}
// 执行命令的 npm 路径,npm-cli 的路径
const npmPath = options.npmPath || process.env.npm_execpath
const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
// 执行路径,Is generally a global bin/node 路径
const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
// 判断是不是 yarn
const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
const spawnArgs = ["run"]
if (npmPathIsJs) {
spawnArgs.unshift(npmPath)
}
if (!isYarn) {
Array.prototype.push.apply(spawnArgs, options.prefixOptions)
}
else if (options.prefixOptions.indexOf("--silent") !== -1) {
spawnArgs.push("--silent")
}
Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))
// 执行命令
cp = spawn(execPath, spawnArgs, spawnOptions)
// Omit the output stream format……
// Register
cp.on("error", (err) => {
cp = null
reject(err)
})
cp.on("close", (code, signal) => {
cp = null
// After a successful return to the task name、状态
resolve({ task, code, signal })
})
})
// 给当前的promiseHang up the static method,To end the current child tasks
promise.abort = function abort() {
if (cp != null) {
cp.kill()
cp = null
}
}
return promise
}
runTask 做了四件事:
Format standard input、输出流,Add some task name header information such as;
获取任务的执行器,获取 npm-cli、node 等路径信息,And then joining together the whole task execute commands;
调用封装后的 spawn 执行命令,并监听 error 和 close Events for returning the execution result;因为系统的不一致,所以 spawn 通过 cross-spawn 做了一层封装.
For the current task hung up the abort 的静态方法,To end the current process;When the task group perform abort 方法时,The actual would call this static method.
总结
Someone may ask: why do you want to go to see a 6 Years ago to write source code?The grammar of old、Can even see mark statement[3] .但我想表达的是,npm-run-all
The core logic of the packet——Through the task queue to realize the task of serial and parallel flow model is a very classic,类似 continueOnError、race The logic control node are everywhere.Such as the current very advanced building systems Turborepo[4] ,它对 pipeline The control logic can also reuse the task flow model.
边栏推荐
猜你喜欢
五星控股汪建国:以“植物精神”深耕赛道,用“动物精神”推动成长
Quotefancy ,提供鼓舞人心语录的壁纸网站 - 倾城之链
POWER SOURCE ETA ETA Power Repair FHG24SX-U Overview
抽象类 or 接口
In-depth analysis of Apache EventMesh cloud-native distributed event-driven architecture
6 rules to sanitize your code
AI+医疗:使用神经网络进行医学影像识别分析
Install win virtual machine on VMware
How to Make Your Company Content GDPR Compliant
Blender程序化建模简明教程【PCG】
随机推荐
好未来,想成为第二个新东方
1215 – Cannot add foreign key constraint
TF uses constant to generate data
五星控股汪建国:以“植物精神”深耕赛道,用“动物精神”推动成长
2022 首期线下 Workshop!面向应用开发者们的数据应用体验日来了 | TiDB Workshop Day
String hashing (2014 SERC J question)
数独 | 回溯-7
Technology Sharing | How to Handle Header Cookies in Interface Automation Testing
从产品角度看 L2 应用:为什么说这是一个游乐场?
【微服务~Nacos】Nacos之配置中心
leetcode 刷题日记 计算右侧小于当前元素的个数
Flask入门学习教程
Interpretation of the paper (DropEdge) "DropEdge: Towards Deep Graph Convolutional Networks on Node Classification"
SQLi-LABS Page-2 (Adv Injections)
The overall construction process of the Tensorflow model
Bean生命周期
【EF】 更新条目时出错。有关详细信息,请参见内部异常。[通俗易懂]
AI+Medical: Using Neural Networks for Medical Image Recognition and Analysis
SecureCRT sets the timeout period for automatic disconnection
Basic operations of openGauss database (super detailed)