调度和触发器
译者:@Ray
Airflow调度程序监视所有任务和所有DAG,并触发已满足其依赖关系的任务实例。 在幕后,它监视并与其可能包含的所有DAG对象的文件夹保持同步,并定期(每分钟左右)检查活动任务以查看是否可以触发它们。
Airflow调度程序旨在作为Airflow生产环境中的持久服务运行。 要开始,您需要做的就是执行airflow scheduler
。 它将使用airflow.cfg
指定的配置。
请注意,如果您在一天的schedule_interval
上运行DAG,则会在2016-01-01T23:59
之后不久触发标记为2016-01-01
的运行。 换句话说,作业实例在其覆盖的时间段结束后启动。
请注意,如果您运行一个schedule_interval
为1天的的DAG
,run
标记为2016-01-01
,那么它会在2016-01-01T23:59
之后马上触发。换句话说,一旦设定的时间周期结束后,工作实例将立马开始。
让我们重复一遍调度schedule_interval
在开始日期之后,在句点结束时运行您的作业一个schedule_interval
。
调度程序启动airflow.cfg
指定的执行程序的实例。 如果碰巧是LocalExecutor
,任务将作为子LocalExecutor
执行; 在CeleryExecutor
和MesosExecutor
的情况下,任务是远程执行的。
要启动调度程序,只需运行以下命令:
airflow scheduler
DAG运行
DAG Run是一个表示DAG实例化的对象。
每个DAG可能有也可能没有时间表,通知如何创建DAG Runs
。 schedule_interval
被定义为DAG参数,并且优选地接收作为str
的cron表达式或datetime.timedelta
对象。 或者,您也可以使用其中一个cron“预设”:
| 预置 | 含义 | cron的 |
| —- | —- | —- |
| None
| 不要安排,专门用于“外部触发”的DAG | |
| @once
| 安排一次,只安排一次 | |
| @hourly
| 在小时开始时每小时运行一次 | 0 * * * *
|
| @daily
| 午夜一天运行一次 | 0 0 * * *
|
| @weekly
| 周日早上每周午夜运行一次 | 0 0 * * 0
|
| @monthly
| 每个月的第一天午夜运行一次 | 0 0 1 * *
|
| @yearly
| 每年1月1日午夜运行一次 | 0 0 1 1 *
|
您的DAG将针对每个计划进行实例化,同时为每个计划创建DAG Run
条目。
DAG运行具有与它们相关联的状态(运行,失败,成功),并通知调度程序应该针对任务提交评估哪组调度。 如果没有DAG运行级别的元数据,Airflow调度程序将需要做更多的工作才能确定应该触发哪些任务并进行爬行。 在更改DAG的形状时,也可能会添加新任务,从而创建不需要的处理。
回填和追赶
具有start_date
(可能是end_date
)和schedule_interval
的Airflow DAG定义了一系列间隔,调度程序将这些间隔转换为单独的Dag运行并执行。 Airflow的一个关键功能是这些DAG运行是原子的幂等项,默认情况下,调度程序将检查DAG的生命周期(从开始到结束/现在,一次一个间隔)并启动DAG运行对于尚未运行(或已被清除)的任何间隔。 这个概念叫做Catchup。
如果你的DAG被编写来处理它自己的追赶(IE不仅限于间隔,而是改为“现在”。),那么你将需要关闭追赶(在DAG本身上使用dag.catchup = False
)或者默认情况下在配置文件级别使用catchup_by_default = False
。 这样做,是指示调度程序仅为DAG间隔序列的最新实例创建DAG运行。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
在上面的示例中,如果调度程序守护程序在2016-01-02上午6点(或从命令行)拾取DAG,则将创建单个DAG运行,其execution_date
为2016-01-01 ,下一个将在2016-01-03上午午夜后创建,执行日期为2016-01-02。
如果dag.catchup
值为True,则调度程序将为2015-12-01和2016-01-02之间的每个完成时间间隔创建一个DAG Run(但不是2016-01-02中的一个,因为该时间间隔)尚未完成)并且调度程序将按顺序执行它们。 对于可以轻松拆分为句点的原子数据集,此行为非常有用。 如果您的DAG运行在内部执行回填,则关闭追赶是很好的。
外部触发器
请注意,在运行airflow trigger_dag
命令时,也可以通过CLI手动创建DAG Runs
,您可以在其中定义特定的run_id
。 在调度程序外部创建的DAG Runs
与触发器的时间戳相关联,并将与预定的DAG runs
一起显示在UI中。
此外,您还可以使用Web UI手动触发DAG Run
(选项卡“DAG” - >列“链接” - >按钮“触发器Dag”)。
要牢记
- 第一个
DAG Run
是基于DAG中任务的最小start_date
创建的。 - 后续
DAG Runs
由调度程序进程根据您的DAG的schedule_interval
顺序创建。 - 当清除一组任务的状态以期让它们重新运行时,重要的是要记住
DAG Run
的状态,因为它定义了调度程序是否应该查看该运行的触发任务。
以下是一些可以取消阻止任务的方法 :
- 在UI中,您可以从任务实例对话框中清除 (如删除状态)各个任务实例,同时定义是否要包括过去/未来和上游/下游依赖项。 请注意,接下来会出现一个确认窗口,您可以看到要清除的设置。 您还可以清除与dag关联的所有任务实例。
- CLI命令
airflow clear -h
在清除任务实例状态时有很多选项,包括指定日期范围,通过指定正则表达式定位task_ids,包含上游和下游亲属的标志,以及特定状态下的目标任务实例(failed
或success
) - 清除任务实例将不再删除任务实例记录。 相反,它更新max_tries并将当前任务实例状态设置为None。
- 将任务实例标记为失败可以通过UI完成。 这可用于停止运行任务实例。
- 将任务实例标记为成功可以通过UI完成。 这主要是为了修复漏报,或者例如在Airflow之外应用修复时。
airflow backfill
CLI子命令具有--mark_success
标志,允许选择DAG的子部分以及指定日期范围。