Cluster Policies
If you want to check or mutate DAGs or Tasks on a cluster-wide level, then a Cluster Policy will let you do that. They have three main purposes:
Checking that DAGs/Tasks meet a certain standard
Setting default arguments on DAGs/Tasks
Performing custom routing logic
There are three main types of cluster policy:
dag_policy
: Takes a DAG parameter calleddag
. Runs at load time of the DAG from DagBag DagBag.task_policy
: Takes a BaseOperator parameter calledtask
. The policy gets executed when the task is created during parsing of the task from DagBag at load time. This means that the whole task definition can be altered in the task policy. It does not relate to a specific task running in a DagRun. Thetask_policy
defined is applied to all the task instances that will be executed in the future.task_instance_mutation_hook
: Takes a TaskInstance parameter calledtask_instance
. Thetask_instance_mutation_hook
applies not to a task but to the instance of a task that relates to a particular DagRun. It is executed in a “worker”, not in the dag file processor, just before the task instance is executed. The policy is only applied to the currently executed run (i.e. instance) of that task.
The DAG and Task cluster policies can raise the AirflowClusterPolicyViolation exception to indicate that the dag/task they were passed is not compliant and should not be loaded.
Any extra attributes set by a cluster policy take priority over those defined in your DAG file; for example, if you set an sla
on your Task in the DAG file, and then your cluster policy also sets an sla
, the cluster policy’s value will take precedence.
How do define a policy function
There are two ways to configure cluster policies:
create an
airflow_local_settings.py
file somewhere in the python search path (theconfig/
folder under your $AIRFLOW_HOME is a good “default” location) and then add callables to the file matching one or more of the cluster policy names above (e.g.dag_policy
).By using a setuptools entrypoint in a custom module using the Pluggy interface.
New in version 2.6.
Note
This is an experimental feature.
This method is more advanced and for people who are already comfortable with python packaging.
First create your policy function in a module:
``` from airflow.policies import hookimpl
@hookimpl
def task_policy(task) -> None:
# Mutate task in place
# ...
print(f"Hello from {__file__}")
```
And then add the entrypoint to your project specification. For example, using `pyproject.toml` and `setuptools`:
```
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "my-airflow-plugin"
version = "0.0.1"
# ...
dependencies = ["apache-airflow>=2.6"]
[project.entry-points.'airflow.policy']
_ = 'my_airflow_plugin.policies'
```
The entrypoint group must be `airflow.policy`, and the name is ignored. The value should be your module (or class) decorated with the `@hookimpl` marker.
Once you have done that, and you have installed your distribution into your Airflow env, the policy functions will get called by the various Airflow components. (The exact call order is undefined, so don’t rely on any particular calling order if you have multiple plugins).
One important thing to note (for either means of defining policy functions) is that the argument names must exactly match as documented below.
Available Policy Functions
airflow.policies.task_policy(task)[source]
This policy setting allows altering tasks after they are loaded in the DagBag.
It allows administrator to rewire some task’s parameters. Alternatively you can raise AirflowClusterPolicyViolation
exception to stop DAG from being executed.
Here are a few examples of how this can be useful:
You could enforce a specific queue (say the
spark
queue) for tasks using theSparkOperator
to make sure that these tasks get wired to the right workersYou could enforce a task timeout policy, making sure that no tasks run for more than 48 hours
Parameters
task (airflow.models.baseoperator.BaseOperator) – task to be mutated
airflow.policies.dag_policy(dag)[source]
This policy setting allows altering DAGs after they are loaded in the DagBag.
It allows administrator to rewire some DAG’s parameters. Alternatively you can raise AirflowClusterPolicyViolation
exception to stop DAG from being executed.
Here are a few examples of how this can be useful:
You could enforce default user for DAGs
Check if every DAG has configured tags
Parameters
dag (airflow.models.dag.DAG) – dag to be mutated
airflow.policies.task_instance_mutation_hook(task_instance)[source]
This setting allows altering task instances before being queued by the Airflow scheduler.
This could be used, for instance, to modify the task instance during retries.
Parameters
task_instance (airflow.models.taskinstance.TaskInstance) – task instance to be mutated
airflow.policies.pod_mutation_hook(pod)[source]
Mutate pod before scheduling.
This setting allows altering kubernetes.client.models.V1Pod
object before they are passed to the Kubernetes client for scheduling.
This could be used, for instance, to add sidecar or init containers to every worker pod launched by KubernetesExecutor or KubernetesPodOperator.
airflow.policies.get_airflow_context_vars(context)[source]
This setting allows getting the airflow context vars, which are key value pairs. They are then injected to default airflow context vars, which in the end are available as environment variables when running tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.
Parameters
context – The context for the task_instance of interest.
Examples
DAG policies
This policy checks if each DAG has at least one tag defined:
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag"""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
)
Note
To avoid import cycles, if you use DAG
in type annotations in your cluster policy, be sure to import from airflow.models
and not from airflow
.
Note
DAG policies are applied after the DAG has been completely loaded, so overriding the default_args
parameter has no effect. If you want to override the default operator settings, use task policies instead.
Task policies
Here’s an example of enforcing a maximum timeout policy on every task:
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.task_type == "HivePartitionSensor":
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
You could also implement to protect against common errors, rather than as technical security controls. For example, don’t run tasks without airflow owners:
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
raise AirflowClusterPolicyViolation(
f"""Task must have non-None non-default owner. Current value: {task.owner}"""
)
If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation
can be reported in the UI (and import errors table in the database).
For example, your airflow_local_settings.py
might follow this pattern:
TASK_RULES: list[Callable[[BaseOperator], None]] = [
task_must_have_owners,
]
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
try:
rule(current_task)
except AirflowClusterPolicyViolation as ex:
notices.append(str(ex))
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
f"Notices:\n"
f"{notices_list}"
)
def example_task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)
Task instance mutation
Here’s an example of re-routing tasks that are on their second (or greater) retry to a different queue:
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = "retry_queue"
Note that since priority weight is determined dynamically using weight rules, you cannot alter the priority_weight
of a task instance within the mutation hook.