Cross-DAG Dependencies

When two DAGs have dependency relationships, it is worth considering combining them into a single DAG, which is usually simpler to understand. Airflow also offers better visual representation of dependencies for tasks on the same DAG. However, it is sometimes not practical to put all related tasks on the same DAG. For example:

  • Two DAGs may have different schedules. E.g. a weekly DAG may have tasks that depend on other tasks on a daily DAG.

  • Different teams are responsible for different DAGs, but these DAGs have some cross-DAG dependencies.

  • A task may depend on another task on the same DAG, but for a different execution_date (start of the data interval).

  • Use execution_delta for tasks running at different times, like execution_delta=timedelta(hours=1) to check against a task that runs 1 hour earlier.

ExternalTaskSensor can be used to establish such dependencies across different DAGs. When it is used together with ExternalTaskMarker, clearing dependent tasks can also happen across different DAGs.

ExternalTaskSensor

Use the ExternalTaskSensor to make tasks on a DAG wait for another task on a different DAG for a specific execution_date.

ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed via allowed_states and failed_states parameters.

airflow/example_dags/example_external_task_marker_dag.py[source]

  1. child_task1 = ExternalTaskSensor(
  2. task_id="child_task1",
  3. external_dag_id=parent_dag.dag_id,
  4. external_task_id=parent_task.task_id,
  5. timeout=600,
  6. allowed_states=["success"],
  7. failed_states=["failed", "skipped"],
  8. mode="reschedule",
  9. )

ExternalTaskSensor with task_group dependency

In Addition, we can also use the ExternalTaskSensor to make tasks on a DAG wait for another task_group on a different DAG for a specific execution_date.

airflow/example_dags/example_external_task_marker_dag.py[source]

  1. child_task2 = ExternalTaskSensor(
  2. task_id="child_task2",
  3. external_dag_id=parent_dag.dag_id,
  4. external_task_group_id="parent_dag_task_group_id",
  5. timeout=600,
  6. allowed_states=["success"],
  7. failed_states=["failed", "skipped"],
  8. mode="reschedule",
  9. )

ExternalTaskMarker

If it is desirable that whenever parent_task on parent_dag is cleared, child_task1 on child_dag for a specific execution_date should also be cleared, ExternalTaskMarker should be used. Note that child_task1 will only be cleared if “Recursive” is selected when the user clears parent_task.

airflow/example_dags/example_external_task_marker_dag.py[source]

  1. parent_task = ExternalTaskMarker(
  2. task_id="parent_task",
  3. external_dag_id="example_external_task_marker_child",
  4. external_task_id="child_task1",
  5. )