AirFlow介绍

一、AirFlow是什么

   airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化.airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks.同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统.
   Airflow的调度依赖于crontab命令,与crontab相比airflow可以直观的看到任务执行情况、任务之间的逻辑依赖关系、可以设定任务出错时邮件提醒、可以查看任务执行日志.

而crontabThe command management method has the following disadvantages:
  1、In the case of multitasking scheduling execution,Difficulty sorting out dependencies between tasks;
  2、It is not convenient to see which task is currently being executed;
  3、It is not convenient to view the execution log when the task execution fails,That is, it is inconvenient to locate the task of the error and the cause of the error;
  4、It is inconvenient to view the start and end consumption time of each task execution under the scheduling flow,This is for optimizationtaskHomework is very important;
  5、It is inconvenient to record the execution of historical scheduling tasks,And this is important for optimizing jobs and troubleshooting;

1、优劣势分析

未使用airflow使用airflow
需要自己添加调度代码、调试复杂、功能单一、缺乏整体调度能力框架调度,简单易用,更稳定,功能全面,可以整体调度
缺乏图形化能力,给任务的新增、排查等操作带来很多困难.特别是当任务较多,结构复杂的时候内置树状图和流程图,清晰明了的展现任务拓扑结构
需要自己添加任务实时监测代码任务实时状态返回网页界面,方便管理和查看
任务的各种操作大多需要编码或命令行完成,不够高效常见操作方式转化为图形化界面,高效清晰
需要手动分离调度和业务代码调度和业务代码分离,减少耦合性,方便运维和迭代

除了以上的优点,One of the shortcomings of engineering practice is that distributed deployment is a bit cumbersome,容易出错.

二、AirFlowhomework and tasks in

1、DAG

概要:DAG(Directed Acyclic Graph)是有向无环图,Also known as a directed acyclic graph.在Airflow中,一个DAG定义了一个完整的作业.同一个DAG中的所有Task拥有相同的调度时间.

参数:

  • dag_id: 唯一识别DAG,方便日后管理
  • default_args: 默认参数,如果当前DAG实例的作业没有配置相应参数,则采用DAG实例的default_args中的相应参数
  • schedule_interval: 配置DAG的执行周期,可采用crontab语法

2、Task

概要:Task为DAG中具体的作业任务,依赖于DAG,That is, it must exist somewhereDAG中.Task在DAG中可以配置依赖关系(Of course, it can also be configured acrossDAG依赖,但是并不推荐.跨DAGDependency will causeDAGThe intuition of the graph is reduced,And bring trouble to dependency management).

参数:

  • dag: 传递一个DAG实例,so that the current job belongs accordinglyDAG
  • task_id: Give the task an identifier(名字),方便日后管理
  • owner: 任务的拥有者,方便日后管理
  • start_date: 任务的开始时间,That is, the task will start scheduling after this point in time

三、AirFlow的调度时间

1、start_date

在配置中,It is the job start scheduling time.And when talking about the state of execution,It is the scheduling start time.

2、schedule_interval

Schedule the execution cycle.

3、execution_date

执行时间.在Airflowis called execution time,But in fact it is not the real execution time.

[敲黑板,划重点]
所以,The first scheduling time:configured in the jobstart_date,且满足schedule_interval的时间点.记录的execution_dateConfigured for the jobstart_datethe first satisfactionschedule_interval的时间.

[举个例子]
假设我们配置了一个作业的start_date为2019年6月2日,配置的schedule_interval为* 00 12 * * *,那么第一次执行的时间将是2019年6月3日 12点.因此execution_dateIt does not mean the execution time literally as expected,真正的执行时间是execution_date所显示的时间的下一个满足schedule_interval的时间点.

四、AirFlow的核心概念

DAGs: 即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序.

Operators: airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令…同时,用户可以自定义Operator,这给用户提供了极大的便利性.It can be understood as an operation required by the user,是Airflow提供的类

Tasks: Task 是 Operator的一个实例

Task Instance: 由于Task会被重复调度,每次task的运行就是不同的task instance了.Task instance 有自己的状态,包括"running", “success”, “failed”, “skipped”, "up for retry"等.

Task Relationships: DAGs中的不同Tasks之间可以有依赖关系

五、AirFlow 各组件介绍

1、webserver

提供web端服务,And will periodically generate subprocesses to scan the corresponding directorydags,并更新数据库
webserver 提供以下功能:

  • 中止、恢复、触发任务.
  • 监控正在运行的任务,断点续跑任务.
  • 执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息.
  • 配置连接,包括不限于数据库、ssh 的连接等.

webserver 守护进程使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值来控制处理并发请求的进程数.
例如:

登录后复制
workers = 4 #表示开启4个gunicorn worker(进程)处理web请求 

2、scheduler

任务调度服务,周期性地轮询任务的调度计划,以确定是否触发任务执行,根据dags生成任务,And submit it to the message middleware queue (redis或rabbitMq)

3、celery worker

分布在不同的机器上,As the real execution node of the task.通过监听消息中间件: redis或rabbitMq 领取任务
当设置 airflow 的 executors 设置为 CeleryExecutor 时才需要开启 worker 守护进程.It is recommended that you use it in a production environment CeleryExecutor :

登录后复制
executor = CeleryExecutor 

4、flower

监控workerProcess survivability,启动或关闭worker进程,查看运行的task
默认的端口为 5555,您可以在浏览器地址栏中输入 “ http://hostip:5555” 来访问 flower ,对 celery 消息队列进行监控.

六、AirFlow的 ETL概念(数据仓库技术)

  ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程.ETL一词较常用在数据仓库,但其对象并不限于数据仓库.
  Airflow设计时,Just for good handlingETL任务而已,But its well designed,It can be used to solve various dependencies of tasks.

1、什么是任务依赖

  通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求.

  • 比如:
    • 时间依赖:任务需要等待某一个时间点触发
    • 外部系统依赖:任务依赖Mysql中的数据,HDFS中的数据等等,These different external systems need to call the interface to access
    • 机器依赖:The execution of tasks can only be performed in the context of a specific machine,Maybe this machine has a lot of memory,It is also possible that only that machine has special library files
    • 任务间依赖:任务A需要在任务B完成后启动,两个任务互相间会产生影响
    • 资源依赖:任务消耗资源非常多,Tasks that use the same resource need to be restricted,For example, to run a data conversion task10个G,Machines in total30个G,You can only run two at most,I want similar tasks to be queued up
    • 权限依赖:Certain tasks can only be started by users with certain privileges

  Maybe you will think that these are the parts of the logic in the task program that need to be handled,但是我认为,These logics can be abstracted as part of the mission control logic,Decoupled from the actual task execution logic.

2、如何理解Crontab

  Now let's take a look at the most commonly used dependency management systems,Crontab
  在各种系统中,There are always some timed tasks that need to be processed,whenever at this time,Our first thought is alwayscrontab.

  确实,crontabIt can handle the needs of scheduled tasks very well,但是对于crontab来说,执行任务,Just calling a program is so simple,And all kinds of logic in the program do not belongcrontab的管辖范围(很好的遵循了KISS)

  So we can think abstractly:
  crontabis a dependency management system,And only manages time dependencies.

3、AirflowThe way dependencies are handled

  Airflow的核心概念,是DAG(有向无环图),DAG由一个或多个TASK组成,而这个DAGIt is to solve the inter-task dependency mentioned above.Task A 执行完成后才能执行 Task B,多个Task之间的依赖关系可以很好的用DAG表示完善

  Airflow完整的支持crontab表达式,也支持直接使用python的datatimepresentation time,还可以用datatime的deltapresentation time差.This solves the time dependency of tasks.

  Airflow在CeleryExecutercan be started with a different userWorker,不同的Worker监听不同的Queue,This solves the user permission dependency problem.WorkerIt can also be launched on multiple different machines,Solve the problem of machine dependencies.

  AirflowCan be any oneTaskSpecifies an abstractPool,每个Pool可以指定一个Slot数.每当一个Task启动时,就占用一个Slot,当SlotWhen the number is full,The rest of the tasks are in a waiting state.This solves the resource dependency problem.

  Airflow中有Hook机制(Actually, I don't think it should be calledHook),Function to establish a connection with an external data system,比如Mysql,HDFS,本地文件系统(File systems are also considered external systems)等,通过拓展HookAn interface that can be connected to any external system can be connected,This solves the problem of external system dependencies.

七、AirFlow 的调度方式

  不同Executer 的架构图AirflowThere are various ways to perform tasks,包括SequentialExecutor、LocalExecutor以及CeleryExecutor,用的较多的是LocalExecutor和CeleryExecutor,Here are the architectures of the three execution modes:

1、基于CeleryExecutor方式的系统架构

  使用celery方式的系统架构图(官方推荐使用这种方式,同时支持mesos方式部署).turing为外部系统,GDags服务帮助拼接成dag,可以忽略.

  1. master节点webui管理dags、日志等信息.scheduler负责调度,只支持单节点,多节点启动scheduler可能会挂掉
  2. worker负责执行具体dag中的task.这样不同的task可以在不同的环境中执行.

AirFlow介绍_airflow

2、基于LocalExecutor方式的系统架构图

  Another way to start thinking,一个dag分配到1台机器上执行.如果taskNot complicated at the same timetask环境相同,可以采用这种方式,方便扩容、管理,同时没有master单点问题.
AirFlow介绍_airflow_02

3、基于SequentialExecutor方式的系统架构图

  SequentialExecutorIndicates that a single process executes sequentially,通常只用于测试.
AirFlow介绍_airflow_03

But sometimes only configuring job dependencies and scheduling execution cycles cannot meet some complex requirements

4、other任务调度方式

1)Skip non-latestDAG Run(A failure occurred in the job,一段时间后恢复)
2)when there is being executedDAG Run时,跳过当前DAG Run(Job execution took too long,until the next job starts)
3)Sensor的替代方案(Airflow中有一类Operator被称为Sensor,SensorIt can sense whether the preset conditions are met,当满足条件后SensorThe job becomesSuccessMakes downstream jobs executable.弊端是,If the upstream job executes3个小时,那么会占用workerNo release for three hours,资源浪费.)

八、AirFlow的原始架构

AirFlow介绍_airflow_04