Cross-DAG Dependencies
When two DAGs have dependency relationships, it is worth considering combining them into a single DAG, which is usually simpler to understand. Airflow also offers better visual representation of dependencies for tasks on the same DAG. However, it is sometimes not practical to put all related tasks on the same DAG. For example:
Two DAGs may have different schedules. E.g. a weekly DAG may have tasks that depend on other tasks on a daily DAG.
Different teams are responsible for different DAGs, but these DAGs have some cross-DAG dependencies.
A task may depend on another task on the same DAG, but for a different
execution_date
(start of the data interval).Use
execution_delta
for tasks running at different times, likeexecution_delta=timedelta(hours=1)
to check against a task that runs 1 hour earlier.
ExternalTaskSensor
can be used to establish such dependencies across different DAGs. When it is used together with ExternalTaskMarker
, clearing dependent tasks can also happen across different DAGs.
ExternalTaskSensor
Use the ExternalTaskSensor to make tasks on a DAG wait for another task on a different DAG for a specific execution_date
.
ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed via allowed_states
and failed_states
parameters.
airflow/example_dags/example_external_task_marker_dag.py[source]
child_task1 = ExternalTaskSensor(
task_id="child_task1",
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
timeout=600,
allowed_states=["success"],
failed_states=["failed", "skipped"],
mode="reschedule",
)
ExternalTaskSensor with task_group dependency
In Addition, we can also use the ExternalTaskSensor to make tasks on a DAG wait for another task_group
on a different DAG for a specific execution_date
.
airflow/example_dags/example_external_task_marker_dag.py[source]
child_task2 = ExternalTaskSensor(
task_id="child_task2",
external_dag_id=parent_dag.dag_id,
external_task_group_id="parent_dag_task_group_id",
timeout=600,
allowed_states=["success"],
failed_states=["failed", "skipped"],
mode="reschedule",
)
ExternalTaskMarker
If it is desirable that whenever parent_task
on parent_dag
is cleared, child_task1
on child_dag
for a specific execution_date
should also be cleared, ExternalTaskMarker
should be used. Note that child_task1
will only be cleared if “Recursive” is selected when the user clears parent_task
.
airflow/example_dags/example_external_task_marker_dag.py[source]
parent_task = ExternalTaskMarker(
task_id="parent_task",
external_dag_id="example_external_task_marker_child",
external_task_id="child_task1",
)