当前位置:网站首页>How do task flow executors work?

How do task flow executors work?

2022-08-09 23:44:00 hebiwen95

Recently in the a OpenAPI 编排器,想到 npm-run-all 的任务流.看了一下这个 6 Years ago, the source of.npm-run-all[1] Is a parallel or serial running multiple npm 脚本的 CLI 工具.阅读完本文,你能收获到:

  • To understand the whole process overview;

  • To understand the core module logic,入口分析、参数解析、任务流、任务执行等;

流程概览

直入主题,整个 npm-run-all 的整体执行流程如下:

When we are in the terminal type command,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap To distribute different command parameter logic.help 和 version 比较简单,本文不做分析.Mission control,会先调用 npmRunAll To do parameter parsing,然后执行 runTasks Perform a task in the group task,After all the task execution returns the result,结束整个流程.

入口分析

npm-run-all Package support three command,We see the source root directory package.json 文件:

{
  "name": "npm-run-all",
  "version": "4.1.5",
  "description": "A CLI tool to run multiple npm-scripts in parallel or sequential.",
  "bin": {
    "run-p": "bin/run-p/index.js",
    "run-s": "bin/run-s/index.js",
    "npm-run-all": "bin/npm-run-all/index.js"
  },
  "main": "lib/index.js",
  "files": [
    "bin",
    "lib",
    "docs"
  ],
  "engines": {
    "node": ">= 4"
  }
}

bin The following definition command scripts:

  • run-p,Simplify the use of script,On behalf of the parallel execution script;

  • run-s,Simplify the use of script,On behalf of the serial execution script;

  • npm-run-all,复杂命令,通过 --serial 和 --parallel Parameters to achieve the same effect as the first two.

直接看到 bin/npm-run-all/index.js

require("../common/bootstrap")("npm-run-all")

上述代码中,如果是执行 run-p 这条命令,The function of the incoming parameter is run-p,run-s 同理.bootstrap 通过参数的不同,将任务分发到 bin Under different directory:

.
├── common
│   ├── bootstrap.js
│   ├── parse-cli-args.js
│   └── version.js
├── npm-run-all
│   ├── help.js
│   ├── index.js
│   └── main.js
├── run-p
│   ├── help.js
│   ├── index.js
│   └── main.js
└── run-s
    ├── help.js
    ├── index.js
    └── main.js

According to the above code structure,结合 ../common/bootstrap 代码:

"use strict"

module.exports = function bootstrap(name) {
    const argv = process.argv.slice(2)

    switch (argv[0]) {
        case undefined:
        case "-h":
        case "--help":
            return require(`../${name}/help`)(process.stdout)

        case "-v":
        case "--version":
            return require("./version")(process.stdout)

        default:
            // https://github.com/mysticatea/npm-run-all/issues/105
            // Avoid MaxListenersExceededWarnings.
            process.stdout.setMaxListeners(0)
            process.stderr.setMaxListeners(0)
            process.stdin.setMaxListeners(0)

            // Main
            return require(`../${name}/main`)(
                argv,
                process.stdout,
                process.stderr
            ).then(
                () => {
                    // I'm not sure why, but maybe the process never exits
                    // on Git Bash (MINGW64)
                    process.exit(0)
                },
                () => {
                    process.exit(1)
                }
            )
    }
}

bootstrap Function calls different directory according to the command of call help、version 或者调用 main 函数,To achieve the effect of the difference to eliminate.

然后再把目光放在 process.stdout.setMaxListeners(0) 这是啥玩意?打开 issue 链接[2],Through the error message and read the official document:

By default EventEmitters will print a warning if more than 10 listeners are added for a particular event. This is a useful default that helps finding memory leaks. The emitter.setMaxListeners() method allows the limit to be modified for this specific EventEmitter instance. The value can be set to Infinity (or 0) to indicate an unlimited number of listeners.

默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告.这是一个有用的默认值,有助于发现内存泄漏.emitter.setMaxListeners() Method allows for the specific EventEmitter Instance modify restrictions.该值可以设置为 Infinity(或 0)To indicate an unlimited number of listeners.

Why do you want to deal with this situation?Because the user may use so:

$ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11

You can never imagine how users will use your tools!

参数解析

After analyzing different command control logic,We entered the core of npmRunAll 函数,Parameter parsing logic is as follows:

module.exports = function npmRunAll(args, stdout, stderr) {
  try {
    const stdin = process.stdin
    const argv = parseCLIArgs(args)
  } catch () {
    // ...
  }
}

Analytical process all the standard input stream parameters,The resulting and return ArgumentSet 实例 set.parseCLIArgsCore Look only at the control task flow execution parameters:

function addGroup(groups, initialValues) {
    groups.push(Object.assign(
        { parallel: false, patterns: [] },
        initialValues || {}
    ))
}

function parseCLIArgsCore(set, args) {
    LOOP:
    for (let i = 0; i < args.length; ++i) {
        const arg = args[i]

        switch (arg) {
       // ...
            case "-s":
            case "--sequential":
            case "--serial":
                if (set.singleMode && arg === "-s") {
                    set.silent = true
                    break
                }
                if (set.singleMode) {
                    throw new Error(`Invalid Option: ${arg}`)
                }
                addGroup(set.groups)
                break

            case "-p":
            case "--parallel":
                if (set.singleMode) {
                    throw new Error(`Invalid Option: ${arg}`)
                }
                addGroup(set.groups, { parallel: true })
                break

            default: {
                // ...
                break
            }
        }
    }

    // ...
    return set
}

The task is to groups 数组中,If it is a parallel tasks(传了 -p--parallel 参数),Give tasks with { parallel: true } 标记.默认是 { parallel: false },A serial task.

Perform a task group

Before getting into this section,我们就 npm-run-all 源码在 scripts Next add a debug 命令:

$ "node ./bin/npm-run-all/index.js lint test"

Parse the parameters generated argv.groups 如下:

[{
  paralles: false,
  patterns: ['lint', 'test']
}]

With this array results,We then look at the task execution process will be more clear.

// bin/npm-run-all/main.js
module.exports = function npmRunAll(args, stdout, stderr) {
    try {
      // Omit analytical parameters
        // 执行任务
        const promise = argv.groups.reduce(
            (prev, group) => {
                // No task in the group,直接返回 null
                if (group.patterns.length === 0) {
                    return prev
                }
                return prev.then(() => runAll(
                    group.patterns, // ['lint', 'test']
                    {
                        // ……
                       // 是否并行执行
                        parallel: group.parallel,
                       // The maximum number of parallel
                        maxParallel: group.parallel ? argv.maxParallel : 1,
                        // A task after failing to continue to perform other tasks
                       // ……
                        arguments: argv.rest,
                        // This is used for when the task is to0When they exit,Termination of all tasks
                        race: group.parallel && argv.race,
                        //……
                    }
                ))
            },
            Promise.resolve(null)
        )
    // ...

        return promise
    }
    catch (err) { 
        //eslint-disable-next-line no-console
        console.error("ERROR:", err.message)

        return Promise.reject(err)
    }
}

After the above code parsing command line parameters of,通过 reduce Stitching all the result of the task group.Task group is npm-run-all Support the configuration of parallel and serial tasks at the same time,And generate multiple task group.例如:

$ npm-run-all -p a b -s c d e

The above command will generate two task group,The parallel task group is ['a', 'b'],Serial task group is ['c', 'd', 'e'].就会执行两次 runAll.Then look at a single task group perform logical:

// lib/index.js
module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
    // Omit the formatted a series of parameters and the default processing……
    try {
        const patterns = parsePatterns(patternOrPatterns, args)
        // Omit the illegal parameter error and parameter calibration between……

        return Promise.resolve()
            .then(() => {
                if (taskList != null) {
                    return { taskList, packageInfo: null }
                }
                return readPackageJson()
            })
            .then(x => {
                // 从 package.json Matching task in
             // 中 patterns 是 ['lint', 'tests'],所以 lint 和 test Both of these tasks must be from package.json 的 scripts Can see the
             const tasks = matchTasks(x.taskList, patterns)
                
                return runTasks(tasks, {
                    // Omit the above format and after checking the parameters of the……
                })
            })
    }
    catch (err) {
        return Promise.reject(new Error(err.message))
    }
}

The above code will be omitted parameters processing and check all,See the core logic matchTasks 和 runTasks.matchTasks 通过读取 package.json 下 scripts 中的命令,Then judgment task group patterns Whether the task is to taskList 中.

Then came to the core logic of this section——调用 runTasks In turn perform each task in the group task:

module.exports = function runTasks(tasks, options) {
    return new Promise((resolve, reject) => {
       // Are not present in the task group task,直接返回空数组
       if (tasks.length === 0) {
            resolve([])
            return
        }

       // 结果数组
        const results = tasks.map(task => ({ name: task, code: undefined }))
        // 任务队列
        const queue = tasks.map((task, index) => ({ name: task, index }))
        // When used to judge the parallel tasks to be completed
        const promises = []
        let error = null
        let aborted = false

        /**
         * Done.
         * @returns {void}
         */
        function done() {
            // ……
        }

        /**
         * Aborts all tasks.
         * @returns {void}
         */
        function abort() {
            // ……
        }

        /**
         * Runs a next task.
         * @returns {void}
         */
        function next() {
           // 任务被终止了,You don't need to go back to perform
            if (aborted) {
                return
            }
           // 并行时,只有满足 queue 和 promises 的长度都为 0,Only can be done judgment task group 
            if (queue.length === 0) {
                if (promises.length === 0) {
                    done()
                }
                return
            }

            const task = queue.shift()
            const promise = runTask(task.name, optionsClone)

            promises.push(promise)
            promise.then(
                (result) => {
                   // 完成一个任务,就将其从 promises 中删除
                    remove(promises, promise)
                    if (aborted) {
                        return
                    }

                    if (result.code) {
                        error = new NpmRunAllError(result, results)
                        // Failure does not continue to carry out the tasks of follow-up after,Is directly to cancel the whole task group
                        if (!options.continueOnError) {
                            abort()
                            return
                        }
                    }

                    // Aborts all tasks if options.race is true.
                    if (options.race && !result.code) {
                        abort()
                        return
                    }

                    // Call the next task.
                    next()
                },
                (thisError) => {
                    remove(promises, promise)
                    if (!options.continueOnError || options.race) {
                        error = thisError
                        abort()
                        return
                    }
                    next()
                }
            )
        }

       // 最大并发数
        const max = options.maxParallel
        // Compare the task number、Configuration of the concurrency、取较小者.This is to prevent the configuration maxParallel Is greater than the actual number of task execution situation
        const end = (typeof max === "number" && max > 0)
            ? Math.min(tasks.length, max)
            : tasks.length
        for (let i = 0; i < end; ++i) {
            next()
        }
    })
}

通过队列,In turn perform each task in the group,After the success of the task will result in result,然后调用 next 执行下一个任务;可以通过 abort Termination of all tasks;通过 done Complete the entire queue status updates,并将结果返回.

Serial mechanism

接下来,Through a picture and this section of the sample,To better understand the serial mechanism:

From left to right to process,讲解一下:

  • 初始化时,According to the task group patterns,生成任务队列;

  • Calculate the number of tasks end 之后,执行 next 函数.At this point from the task queue lint 任务,调用 runTask 去执行该任务(图2所示).(runTask In the next section the details of the analysis.)执行完成后,Performs the following tasks:

    • 如果配置了 aggregateOutput 参数,Task will be written to the memory stream output flow;

    • 更新 result.code,If the configuration failure not to continue(!continueOnError) 或者 race 参数,就直接调用 abort Termination of the whole task queue,返回结果;

  • If success or configuration failed to perform other tasks( continueOnError),Go to task queue to retrieve the next task(图3所示),会去执行 test 任务,Repeat the step logic.There is no other task in the final task queue,此时也会执行 done 函数,The task group completed,并将 results 返回.

并行机制

Parallel mechanism of different is that the initial time would call many times next 函数,And will determine whether the current and executing tasks.

Above, we execute the following command flow chart:

$ node ./bin/npm-run-all/index.js  -p lint test --max-parallel 2

Command means that parallel execution lint 和 test 任务,And maximum concurrency is 2.

Back to the flow chart above:

  • When the initial will create a task queue,并将 lint 和 test Two tasks are added to the queue;

  • And then in the execution time for the first time,Because we are concurrent execution,So will call twice next 函数,promises Array will hold two promise 实例;

  • 当 lint The task to complete(此时 test 任务还在执行,即 test promise 还未结束),Will call again at this time next 函数.At this time will determine whether task queue and ongoing task queue is empty,如果是的话就调用 done 返回结果,否则什么都不做,Wait for the other tasks to complete.

  • 当 test When the task is complete(假设此时 lint 任务已经完成),May also be performed again next.但此时 queue 和 promises Both the length of the array is 0,就执行 done 逻辑,Output the result of the task group.

小结

In this section, we study the task in the task group either serial or parallel mechanism,都通过任务队列依次执行.不同的是,Serial is executed only once for the first time next,Parallel execution according to the parameters many times next.When meet the queue is empty and all the tasks are completed,Ends the current task group,And will be cached in results 中的结果返回.

A single task how to perform

Understand the task group of serial and parallel mechanism,This section is to understand how the individual tasks are performed.

module.exports = function runTask(task, options) {
    let cp = null
    const promise = new Promise((resolve, reject) => {
        // Packing output、输入、Wrong information……

        // Task name written into the output stream
        if (options.printName && stdout != null) {
            stdout.write(createHeader(
                task,
                options.packageInfo,
                options.stdout.isTTY
            ))
        }

        // 执行命令的 npm 路径,npm-cli 的路径
        const npmPath = options.npmPath || process.env.npm_execpath
        const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
        // 执行路径,Is generally a global bin/node 路径
        const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
        // 判断是不是 yarn
        const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
        const spawnArgs = ["run"]

        if (npmPathIsJs) {
            spawnArgs.unshift(npmPath)
        }
        if (!isYarn) {
            Array.prototype.push.apply(spawnArgs, options.prefixOptions)
        }
        else if (options.prefixOptions.indexOf("--silent") !== -1) {
            spawnArgs.push("--silent")
        }
        Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))

       // 执行命令
        cp = spawn(execPath, spawnArgs, spawnOptions)

        // Omit the output stream format……

        // Register
        cp.on("error", (err) => {
            cp = null
            reject(err)
        })
        cp.on("close", (code, signal) => {
            cp = null
           // After a successful return to the task name、状态
            resolve({ task, code, signal })
        })
    })

    // 给当前的promiseHang up the static method,To end the current child tasks
    promise.abort = function abort() {
        if (cp != null) {
            cp.kill()
            cp = null
        }
    }

    return promise
}

runTask 做了四件事:

  1. Format standard input、输出流,Add some task name header information such as;

  2. 获取任务的执行器,获取 npm-cli、node 等路径信息,And then joining together the whole task execute commands;

  3. 调用封装后的 spawn 执行命令,并监听 error 和 close Events for returning the execution result;因为系统的不一致,所以 spawn 通过 cross-spawn 做了一层封装.

  4. For the current task hung up the abort 的静态方法,To end the current process;When the task group perform abort 方法时,The actual would call this static method.

总结

Someone may ask: why do you want to go to see a 6 Years ago to write source code?The grammar of old、Can even see mark statement[3] .但我想表达的是,npm-run-all The core logic of the packet——Through the task queue to realize the task of serial and parallel flow model is a very classic,类似 continueOnError、race The logic control node are everywhere.Such as the current very advanced building systems Turborepo[4] ,它对 pipeline The control logic can also reuse the task flow model.

原网站

版权声明
本文为[hebiwen95]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/221/202208091959515780.html