当前位置:网站首页>任务流执行器是如何工作的?

任务流执行器是如何工作的?

2022-08-09 21:51:00 hebiwen95

最近在整一个 OpenAPI 编排器,想到 npm-run-all 的任务流。看了一下这个 6 年前的源码。npm-run-all[1] 是一个用来并行或者串行运行多个 npm 脚本的 CLI 工具。阅读完本文,你能收获到:

  • 了解整个流程概览;

  • 了解核心模块逻辑,入口分析、参数解析、任务流、任务执行等;

流程概览

直入主题,整个 npm-run-all 的整体执行流程如下:

当我们在终端敲入命令,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap 去分发不同命令不同参数的逻辑。help 和 version 比较简单,本文不做分析。任务控制方面,会先调用 npmRunAll 做参数解析,然后执行 runTasks 执行任务组中任务,全部任务执行后返回结果,结束整个流程。

入口分析

npm-run-all 包支持三条命令,我们看到源码根目录的 package.json 文件:

{
  "name": "npm-run-all",
  "version": "4.1.5",
  "description": "A CLI tool to run multiple npm-scripts in parallel or sequential.",
  "bin": {
    "run-p": "bin/run-p/index.js",
    "run-s": "bin/run-s/index.js",
    "npm-run-all": "bin/npm-run-all/index.js"
  },
  "main": "lib/index.js",
  "files": [
    "bin",
    "lib",
    "docs"
  ],
  "engines": {
    "node": ">= 4"
  }
}

bin 下面定义的命令脚本:

  • run-p,简化使用的脚本,代表并行执行脚本;

  • run-s,简化使用的脚本,代表串行执行脚本;

  • npm-run-all,复杂命令,通过 --serial 和 --parallel 参数实现前两者一样的效果。

直接看到 bin/npm-run-all/index.js

require("../common/bootstrap")("npm-run-all")

上述代码中,如果是执行 run-p 这条命令,则函数传入的参数是 run-prun-s 同理。bootstrap 通过参数的不同,将任务分发到 bin 下不同目录中:

.
├── common
│   ├── bootstrap.js
│   ├── parse-cli-args.js
│   └── version.js
├── npm-run-all
│   ├── help.js
│   ├── index.js
│   └── main.js
├── run-p
│   ├── help.js
│   ├── index.js
│   └── main.js
└── run-s
    ├── help.js
    ├── index.js
    └── main.js

照着上述代码结构,结合 ../common/bootstrap 代码:

"use strict"

module.exports = function bootstrap(name) {
    const argv = process.argv.slice(2)

    switch (argv[0]) {
        case undefined:
        case "-h":
        case "--help":
            return require(`../${name}/help`)(process.stdout)

        case "-v":
        case "--version":
            return require("./version")(process.stdout)

        default:
            // https://github.com/mysticatea/npm-run-all/issues/105
            // Avoid MaxListenersExceededWarnings.
            process.stdout.setMaxListeners(0)
            process.stderr.setMaxListeners(0)
            process.stdin.setMaxListeners(0)

            // Main
            return require(`../${name}/main`)(
                argv,
                process.stdout,
                process.stderr
            ).then(
                () => {
                    // I'm not sure why, but maybe the process never exits
                    // on Git Bash (MINGW64)
                    process.exit(0)
                },
                () => {
                    process.exit(1)
                }
            )
    }
}

bootstrap 函数依据调用的命令调用不同目录下的 help、version 或者调用 main 函数,达到了差异消除的效果。

然后再把目光放在 process.stdout.setMaxListeners(0) 这是啥玩意?打开 issue 链接[2],通过报错信息和翻阅官方文档:

By default EventEmitters will print a warning if more than 10 listeners are added for a particular event. This is a useful default that helps finding memory leaks. The emitter.setMaxListeners() method allows the limit to be modified for this specific EventEmitter instance. The value can be set to Infinity (or 0) to indicate an unlimited number of listeners.

默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告。这是一个有用的默认值,有助于发现内存泄漏。emitter.setMaxListeners() 方法允许为这个特定的 EventEmitter 实例修改限制。该值可以设置为 Infinity(或 0)以指示无限数量的侦听器。

为什么要处理这个情况呢?因为用户可能这么使用:

$ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11

你永远想象不到用户会怎么使用你的工具!

参数解析

分析完不同命令的控制逻辑,我们进入核心的 npmRunAll 函数,参数解析部分逻辑如下:

module.exports = function npmRunAll(args, stdout, stderr) {
  try {
    const stdin = process.stdin
    const argv = parseCLIArgs(args)
  } catch () {
    // ...
  }
}

解析处理所有标准输入流参数,最终生成并返回 ArgumentSet 实例 set。parseCLIArgsCore 只看控制任务流执行的参数:

function addGroup(groups, initialValues) {
    groups.push(Object.assign(
        { parallel: false, patterns: [] },
        initialValues || {}
    ))
}

function parseCLIArgsCore(set, args) {
    LOOP:
    for (let i = 0; i < args.length; ++i) {
        const arg = args[i]

        switch (arg) {
       // ...
            case "-s":
            case "--sequential":
            case "--serial":
                if (set.singleMode && arg === "-s") {
                    set.silent = true
                    break
                }
                if (set.singleMode) {
                    throw new Error(`Invalid Option: ${arg}`)
                }
                addGroup(set.groups)
                break

            case "-p":
            case "--parallel":
                if (set.singleMode) {
                    throw new Error(`Invalid Option: ${arg}`)
                }
                addGroup(set.groups, { parallel: true })
                break

            default: {
                // ...
                break
            }
        }
    }

    // ...
    return set
}

将任务都装到 groups 数组中,如果是并行任务(传了 -p--parallel 参数),就给任务加上 { parallel: true } 标记。默认是 { parallel: false },即串行任务。

执行任务组

在进入这一小节之前,我们就 npm-run-all 源码在 scripts 下加一条 debug 命令:

$ "node ./bin/npm-run-all/index.js lint test"

解析完参数生成的 argv.groups 如下:

[{
  paralles: false,
  patterns: ['lint', 'test']
}]

有了这个数组结果,我们再看任务执行流程会更加明朗。

// bin/npm-run-all/main.js
module.exports = function npmRunAll(args, stdout, stderr) {
    try {
      // 省略解析参数
        // 执行任务
        const promise = argv.groups.reduce(
            (prev, group) => {
                // 分组中没有任务,直接返回 null
                if (group.patterns.length === 0) {
                    return prev
                }
                return prev.then(() => runAll(
                    group.patterns, // ['lint', 'test']
                    {
                        // ……
                       // 是否并行执行
                        parallel: group.parallel,
                       // 并行的最大数量
                        maxParallel: group.parallel ? argv.maxParallel : 1,
                        // 一个任务失败后继续执行其他任务
                       // ……
                        arguments: argv.rest,
                        // 这个用于当任务以0码退出时,终止全部任务
                        race: group.parallel && argv.race,
                        //……
                    }
                ))
            },
            Promise.resolve(null)
        )
    // ...

        return promise
    }
    catch (err) { 
        //eslint-disable-next-line no-console
        console.error("ERROR:", err.message)

        return Promise.reject(err)
    }
}

上述代码解析完命令行中的参数之后,通过 reduce 拼接所有任务组的结果。任务组就是 npm-run-all 支持同时配置并行和串行的任务,并生成多个任务组。例如:

$ npm-run-all -p a b -s c d e

上述命令会生成两个任务组,并行任务组是 ['a', 'b'],串行任务组是 ['c', 'd', 'e']。就会执行两次 runAll。接下来就看看单个任务组的执行逻辑:

// lib/index.js
module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
    // 省略一系列参数格式化和默认值处理……
    try {
        const patterns = parsePatterns(patternOrPatterns, args)
        // 省略非法参数报错和参数之间的校验……

        return Promise.resolve()
            .then(() => {
                if (taskList != null) {
                    return { taskList, packageInfo: null }
                }
                return readPackageJson()
            })
            .then(x => {
                // 从 package.json 中匹配任务
             // 中 patterns 是 ['lint', 'tests'],所以 lint 和 test 这两个任务一定要从 package.json 的 scripts 中能查看到
             const tasks = matchTasks(x.taskList, patterns)
                
                return runTasks(tasks, {
                    // 省略上面格式化和校验后的参数……
                })
            })
    }
    catch (err) {
        return Promise.reject(new Error(err.message))
    }
}

上述代码将参数的处理和校验全部省略掉了,看到核心的逻辑 matchTasks 和 runTasksmatchTasks 通过读取 package.json 下 scripts 中的命令,然后判断任务组 patterns 中的任务是否都存在于 taskList 中。

然后就来到了本小节的核心逻辑——调用 runTasks 依次执行每个任务组中的任务:

module.exports = function runTasks(tasks, options) {
    return new Promise((resolve, reject) => {
       // 任务组中不存在任务,直接返回空数组
       if (tasks.length === 0) {
            resolve([])
            return
        }

       // 结果数组
        const results = tasks.map(task => ({ name: task, code: undefined }))
        // 任务队列
        const queue = tasks.map((task, index) => ({ name: task, index }))
        // 用于判断并行的时候任务是否全部完成
        const promises = []
        let error = null
        let aborted = false

        /**
         * Done.
         * @returns {void}
         */
        function done() {
            // ……
        }

        /**
         * Aborts all tasks.
         * @returns {void}
         */
        function abort() {
            // ……
        }

        /**
         * Runs a next task.
         * @returns {void}
         */
        function next() {
           // 任务被终止了,则不需要再往下执行了
            if (aborted) {
                return
            }
           // 并行时,只有满足 queue 和 promises 的长度都为 0,才可以判断任务组完成 
            if (queue.length === 0) {
                if (promises.length === 0) {
                    done()
                }
                return
            }

            const task = queue.shift()
            const promise = runTask(task.name, optionsClone)

            promises.push(promise)
            promise.then(
                (result) => {
                   // 完成一个任务,就将其从 promises 中删除
                    remove(promises, promise)
                    if (aborted) {
                        return
                    }

                    if (result.code) {
                        error = new NpmRunAllError(result, results)
                        // 失败后不继续执行后续的任务,则直接终止整个任务组
                        if (!options.continueOnError) {
                            abort()
                            return
                        }
                    }

                    // Aborts all tasks if options.race is true.
                    if (options.race && !result.code) {
                        abort()
                        return
                    }

                    // Call the next task.
                    next()
                },
                (thisError) => {
                    remove(promises, promise)
                    if (!options.continueOnError || options.race) {
                        error = thisError
                        abort()
                        return
                    }
                    next()
                }
            )
        }

       // 最大并发数
        const max = options.maxParallel
        // 对比任务数量、配置的并发数、取较小者。这是为了防止配置的 maxParallel 比实际执行的任务数量还大的情况
        const end = (typeof max === "number" && max > 0)
            ? Math.min(tasks.length, max)
            : tasks.length
        for (let i = 0; i < end; ++i) {
            next()
        }
    })
}

通过队列,依次执行组中的每一条任务,任务成功后将结果存入 result,然后调用 next 执行下一个任务;可以通过 abort 终止全部任务;通过 done 完成整个队列的状态更新,并将结果返回。

串行机制

接下来,通过一张图和本小节的示例,来更好地理解串行机制:

从左到右得流程,讲解一下:

  • 初始化时,根据任务组的 patterns,生成任务队列;

  • 计算完任务数量 end 之后,执行 next 函数。此时会从任务队列中取出 lint 任务,调用 runTask 去执行该任务(图2所示)。(runTask 的细节放到下一小节分析。)执行完成后,会执行以下子任务:

    • 如果配置了 aggregateOutput 参数,会将任务的输出流写入到内存流;

    • 更新 result.code,如果配置了失败不继续执行(!continueOnError) 或者 race 参数,就直接调用 abort 终止整个任务队列,返回结果;

  • 如果成功或配置失败了继续执行其他任务( continueOnError),就去任务队列中取出下一个任务(图3所示),会去执行 test 任务,重复上一步骤的逻辑。最终任务队列中没有其他任务了,此时也会执行 done 函数,结束整个任务组,并将 results 返回。

并行机制

并行机制的不同就在于初始的时候会调用多次 next 函数,并且会判断当前是否还有正在执行的任务。

上图是我们执行以下命令的流程图:

$ node ./bin/npm-run-all/index.js  -p lint test --max-parallel 2

命令的意思是并行执行 lint 和 test 任务,并且最大并发数是 2。

回到上面的流程图:

  • 初始时还是会创建一个任务队列,并将 lint 和 test 两个任务添加到队列中;

  • 然后在首次执行时,因为我们是并发执行,所以会调用两次 next 函数,promises 数组会保存两个 promise 实例;

  • 当 lint 任务先完成(此时 test 任务还在执行,即 test promise 还未结束),此时会再调用 next 函数。此时会判断任务队列和正在进行的任务队列是否为空,如果是的话就调用 done 返回结果,否则什么都不做,等待其他任务执行完成。

  • 当 test 任务也完成时(假设此时 lint 任务已经完成),同样也会再次执行 next。但此时 queue 和 promises 两个数组的长度都是 0,就执行 done 逻辑,输出任务组的结果。

小结

本节我们学习了任务组中的任务不管是串行机制还是并行机制,都通过任务队列依次执行。不同的是,串行是首次只执行一次 next,并行根据参数执行多次 next。当满足队列为空并且所有任务都完成,就结束当前任务组,并将缓存在 results 中的结果返回。

单个任务如何执行

了解完任务组的串行和并行机制,这一小节就来了解单个任务是如何被执行的。

module.exports = function runTask(task, options) {
    let cp = null
    const promise = new Promise((resolve, reject) => {
        // 包装输出、输入、错误信息流……

        // 在输出流中写入任务名称
        if (options.printName && stdout != null) {
            stdout.write(createHeader(
                task,
                options.packageInfo,
                options.stdout.isTTY
            ))
        }

        // 执行命令的 npm 路径,npm-cli 的路径
        const npmPath = options.npmPath || process.env.npm_execpath
        const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
        // 执行路径,一般是全局的 bin/node 路径
        const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
        // 判断是不是 yarn
        const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
        const spawnArgs = ["run"]

        if (npmPathIsJs) {
            spawnArgs.unshift(npmPath)
        }
        if (!isYarn) {
            Array.prototype.push.apply(spawnArgs, options.prefixOptions)
        }
        else if (options.prefixOptions.indexOf("--silent") !== -1) {
            spawnArgs.push("--silent")
        }
        Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))

       // 执行命令
        cp = spawn(execPath, spawnArgs, spawnOptions)

        // 省略输出流格式化……

        // Register
        cp.on("error", (err) => {
            cp = null
            reject(err)
        })
        cp.on("close", (code, signal) => {
            cp = null
           // 成功后返回任务名称、状态
            resolve({ task, code, signal })
        })
    })

    // 给当前的promise挂上静态方法,用于结束当前子进程任务
    promise.abort = function abort() {
        if (cp != null) {
            cp.kill()
            cp = null
        }
    }

    return promise
}

runTask 做了四件事:

  1. 格式化标准输入、输出流,添加一些任务名称头部信息之类的;

  2. 获取任务的执行器,获取 npm-cli、node 等路径信息,然后拼接整个任务的执行命令;

  3. 调用封装后的 spawn 执行命令,并监听 error 和 close 事件用于返回执行结果;因为系统的不一致,所以 spawn 通过 cross-spawn 做了一层封装。

  4. 给当前任务挂上了 abort 的静态方法,用于结束当前进程;当在任务组执行 abort 方法时,实际会调用这个静态方法。

总结

有人会问为什么要去看一个 6 年前写的源码?语法老旧、甚至还能看到标记声明[3] 。但我想表达的是,npm-run-all 这个包的核心逻辑——通过任务队列去实现串行和并行的任务流模型是非常经典的,类似 continueOnError、race 这样的逻辑控制节点也随处可见。例如当前非常先进的构建系统 Turborepo[4] ,它对 pipeline 的控制逻辑也能复用这个任务流模型。

原网站

版权声明
本文为[hebiwen95]所创,转载请带上原文链接,感谢
https://blog.csdn.net/hebiwen95/article/details/126246693