Lineage
注意
Lineage 支持是非常实验性的,可能会发生变化。
Airflow可以帮助跟踪数据的来源,发生的事情以及数据随时间的变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。
气流通过任务的入口和出口跟踪数据。 让我们从一个例子开始,看看它是如何工作的。
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta
FILE_CATEGORIES = [ "CAT1" , "CAT2" , "CAT3" ]
args = {
'owner' : 'airflow' ,
'start_date' : airflow . utils . dates . days_ago ( 2 )
}
dag = DAG (
dag_id = 'example_lineage' , default_args = args ,
schedule_interval = '0 0 * * *' ,
dagrun_timeout = timedelta ( minutes = 60 ))
f_final = File ( "/tmp/final" )
run_this_last = DummyOperator ( task_id = 'run_this_last' , dag = dag ,
inlets = { "auto" : True },
outlets = { "datasets" : [ f_final ,]})
f_in = File ( "/tmp/whole_directory/" )
outlets = []
for file in FILE_CATEGORIES :
f_out = File ( "/tmp/ {} /{{{{ execution_date }}}}" . format ( file ))
outlets . append ( f_out )
run_this = BashOperator (
task_id = 'run_me_first' , bash_command = 'echo 1' , dag = dag ,
inlets = { "datasets" : [ f_in ,]},
outlets = { "datasets" : outlets }
)
run_this . set_downstream ( run_this_last )
任务采用参数<cite>入口</cite>和<cite>出口</cite> 。 入口可以由数据集列表<cite>{“数据集”:[dataset1,dataset2]}</cite>手动定义,也可以配置为从上游任务中查找出口<cite>{“task_ids”:[“task_id1”,“task_id2”]}</cite>或者可以配置为从直接上游任务<cite>{“auto”:True}</cite>或它们的组合中获取出口。 出口被定义为数据集列表<cite>{“数据集”:[dataset1,dataset2]}</cite> 。 在执行任务时,数据集的任何字段都使用上下文进行模板化。
注意
如果操作员支持,操作员可以自动添加入口和出口。
在示例DAG任务中, <cite>run_me_first</cite>是一个BashOperator,它接收从列表生成的3个入口: <cite>CAT1</cite> , <cite>CAT2</cite> , <cite>CAT3</cite> 。 请注意, <cite>execution_date</cite>是一个模板化字段,将在任务运行时呈现。
注意
在幕后,Airflow将沿袭元数据作为任务的<cite>pre_execute</cite>方法的一部分进行准备。 当任务完成执行<cite>时,</cite>将调用<cite>post_execute</cite>并将lineage元数据推送到XCOM中。 因此,如果您要创建自己的覆盖此方法的运算符,请确保分别使用<cite>prepare_lineage</cite>和<cite>apply_lineage</cite>修饰您的方法。
Apache Atlas
Airflow可以将其沿袭元数据发送到Apache Atlas。 您需要启用<cite>atlas</cite>后端并正确配置它,例如在<cite>airflow.cfg中</cite> :
[ lineage ]
backend = airflow . lineage . backend . atlas
[ atlas ]
username = my_username
password = my_password
host = host
port = 21000
请确保安装了<cite>atlasclient</cite>软件包。