Cluster Policies

If you want to check or mutate DAGs or Tasks on a cluster-wide level, then a Cluster Policy will let you do that. They have three main purposes:

  • Checking that DAGs/Tasks meet a certain standard

  • Setting default arguments on DAGs/Tasks

  • Performing custom routing logic

There are three main types of cluster policy:

  • dag_policy: Takes a DAG parameter called dag. Runs at load time of the DAG from DagBag DagBag.

  • task_policy: Takes a BaseOperator parameter called task. The policy gets executed when the task is created during parsing of the task from DagBag at load time. This means that the whole task definition can be altered in the task policy. It does not relate to a specific task running in a DagRun. The task_policy defined is applied to all the task instances that will be executed in the future.

  • task_instance_mutation_hook: Takes a TaskInstance parameter called task_instance. The task_instance_mutation_hook applies not to a task but to the instance of a task that relates to a particular DagRun. It is executed in a “worker”, not in the dag file processor, just before the task instance is executed. The policy is only applied to the currently executed run (i.e. instance) of that task.

The DAG and Task cluster policies can raise the AirflowClusterPolicyViolation exception to indicate that the dag/task they were passed is not compliant and should not be loaded.

Any extra attributes set by a cluster policy take priority over those defined in your DAG file; for example, if you set an sla on your Task in the DAG file, and then your cluster policy also sets an sla, the cluster policy’s value will take precedence.

How do define a policy function

There are two ways to configure cluster policies:

  1. create an airflow_local_settings.py file somewhere in the python search path (the config/ folder under your $AIRFLOW_HOME is a good “default” location) and then add callables to the file matching one or more of the cluster policy names above (e.g. dag_policy).

  2. By using a setuptools entrypoint in a custom module using the Pluggy interface.

    New in version 2.6.

    Note

    This is an experimental feature.

    This method is more advanced and for people who are already comfortable with python packaging.

    First create your policy function in a module:

    ``` from airflow.policies import hookimpl

  1. @hookimpl
  2. def task_policy(task) -> None:
  3. # Mutate task in place
  4. # ...
  5. print(f"Hello from {__file__}")
  6. ```
  7. And then add the entrypoint to your project specification. For example, using `pyproject.toml` and `setuptools`:
  8. ```
  9. [build-system]
  10. requires = ["setuptools", "wheel"]
  11. build-backend = "setuptools.build_meta"
  12. [project]
  13. name = "my-airflow-plugin"
  14. version = "0.0.1"
  15. # ...
  16. dependencies = ["apache-airflow>=2.6"]
  17. [project.entry-points.'airflow.policy']
  18. _ = 'my_airflow_plugin.policies'
  19. ```
  20. The entrypoint group must be `airflow.policy`, and the name is ignored. The value should be your module (or class) decorated with the `@hookimpl` marker.
  21. Once you have done that, and you have installed your distribution into your Airflow env, the policy functions will get called by the various Airflow components. (The exact call order is undefined, so don’t rely on any particular calling order if you have multiple plugins).

One important thing to note (for either means of defining policy functions) is that the argument names must exactly match as documented below.

Available Policy Functions

airflow.policies.task_policy(task)[source]

This policy setting allows altering tasks after they are loaded in the DagBag.

It allows administrator to rewire some task’s parameters. Alternatively you can raise AirflowClusterPolicyViolation exception to stop DAG from being executed.

Here are a few examples of how this can be useful:

  • You could enforce a specific queue (say the spark queue) for tasks using the SparkOperator to make sure that these tasks get wired to the right workers

  • You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours

  • Parameters

    task (airflow.models.baseoperator.BaseOperator) – task to be mutated

airflow.policies.dag_policy(dag)[source]

This policy setting allows altering DAGs after they are loaded in the DagBag.

It allows administrator to rewire some DAG’s parameters. Alternatively you can raise AirflowClusterPolicyViolation exception to stop DAG from being executed.

Here are a few examples of how this can be useful:

  • You could enforce default user for DAGs

  • Check if every DAG has configured tags

  • Parameters

    dag (airflow.models.dag.DAG) – dag to be mutated

airflow.policies.task_instance_mutation_hook(task_instance)[source]

This setting allows altering task instances before being queued by the Airflow scheduler.

This could be used, for instance, to modify the task instance during retries.

airflow.policies.pod_mutation_hook(pod)[source]

Mutate pod before scheduling.

This setting allows altering kubernetes.client.models.V1Pod object before they are passed to the Kubernetes client for scheduling.

This could be used, for instance, to add sidecar or init containers to every worker pod launched by KubernetesExecutor or KubernetesPodOperator.

airflow.policies.get_airflow_context_vars(context)[source]

This setting allows getting the airflow context vars, which are key value pairs. They are then injected to default airflow context vars, which in the end are available as environment variables when running tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.

  • Parameters

    context – The context for the task_instance of interest.

Examples

DAG policies

This policy checks if each DAG has at least one tag defined:

  1. def dag_policy(dag: DAG):
  2. """Ensure that DAG has at least one tag"""
  3. if not dag.tags:
  4. raise AirflowClusterPolicyViolation(
  5. f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
  6. )

Note

To avoid import cycles, if you use DAG in type annotations in your cluster policy, be sure to import from airflow.models and not from airflow.

Note

DAG policies are applied after the DAG has been completely loaded, so overriding the default_args parameter has no effect. If you want to override the default operator settings, use task policies instead.

Task policies

Here’s an example of enforcing a maximum timeout policy on every task:

  1. class TimedOperator(BaseOperator, ABC):
  2. timeout: timedelta
  3. def task_policy(task: TimedOperator):
  4. if task.task_type == "HivePartitionSensor":
  5. task.queue = "sensor_queue"
  6. if task.timeout > timedelta(hours=48):
  7. task.timeout = timedelta(hours=48)

You could also implement to protect against common errors, rather than as technical security controls. For example, don’t run tasks without airflow owners:

  1. def task_must_have_owners(task: BaseOperator):
  2. if task.owner and not isinstance(task.owner, str):
  3. raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
  4. if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
  5. raise AirflowClusterPolicyViolation(
  6. f"""Task must have non-None non-default owner. Current value: {task.owner}"""
  7. )

If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation can be reported in the UI (and import errors table in the database).

For example, your airflow_local_settings.py might follow this pattern:

  1. TASK_RULES: list[Callable[[BaseOperator], None]] = [
  2. task_must_have_owners,
  3. ]
  4. def _check_task_rules(current_task: BaseOperator):
  5. """Check task rules for given task."""
  6. notices = []
  7. for rule in TASK_RULES:
  8. try:
  9. rule(current_task)
  10. except AirflowClusterPolicyViolation as ex:
  11. notices.append(str(ex))
  12. if notices:
  13. notices_list = " * " + "\n * ".join(notices)
  14. raise AirflowClusterPolicyViolation(
  15. f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
  16. f"Notices:\n"
  17. f"{notices_list}"
  18. )
  19. def example_task_policy(task: BaseOperator):
  20. """Ensure Tasks have non-default owners."""
  21. _check_task_rules(task)

Task instance mutation

Here’s an example of re-routing tasks that are on their second (or greater) retry to a different queue:

  1. def task_instance_mutation_hook(task_instance: TaskInstance):
  2. if task_instance.try_number >= 1:
  3. task_instance.queue = "retry_queue"

Note that since priority weight is determined dynamically using weight rules, you cannot alter the priority_weight of a task instance within the mutation hook.