PythonOperator

Use the @task decorator to execute Python callables.

Warning

The @task decorator is recommended over the classic PythonOperator to execute Python callables.

airflow/example_dags/example_python_operator.py[source]

  1. @task(task_id="print_the_context")
  2. def print_context(ds=None, **kwargs):
  3. """Print the Airflow context and ds variable from the context."""
  4. pprint(kwargs)
  5. print(ds)
  6. return "Whatever you return gets printed in the logs"
  7. run_this = print_context()

Passing in arguments

Pass extra arguments to the @task decorated function as you would with a normal Python function.

airflow/example_dags/example_python_operator.py[source]

  1. # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
  2. for i in range(5):
  3. @task(task_id=f"sleep_for_{i}")
  4. def my_sleeping_function(random_base):
  5. """This is a function that will run within the DAG execution"""
  6. time.sleep(random_base)
  7. sleeping_task = my_sleeping_function(random_base=float(i) / 10)
  8. run_this >> log_the_sql >> sleeping_task

Templating

Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

The templates_dict argument is templated, so each value in the dictionary is evaluated as a Jinja template.

airflow/example_dags/example_python_operator.py[source]

  1. @task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
  2. def log_sql(**kwargs):
  3. logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
  4. log_the_sql = log_sql()

PythonVirtualenvOperator

Use the @task.virtualenv decorator to execute Python callables inside a new Python virtual environment. The virtualenv package needs to be installed in the environment that runs Airflow (as optional dependency pip install airflow[virtualenv] --constraint ...).

Warning

The @task.virtualenv decorator is recommended over the classic PythonVirtualenvOperator to execute Python callables inside new Python virtual environments.

TaskFlow example of using the PythonVirtualenvOperator:

airflow/example_dags/example_python_operator.py[source]

  1. @task.virtualenv(
  2. task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
  3. )
  4. def callable_virtualenv():
  5. """
  6. Example function that will be performed in a virtual environment.
  7. Importing at the module level ensures that it will not attempt to import the
  8. library before it is installed.
  9. """
  10. from time import sleep
  11. from colorama import Back, Fore, Style
  12. print(Fore.RED + "some red text")
  13. print(Back.GREEN + "and with a green background")
  14. print(Style.DIM + "and in dim text")
  15. print(Style.RESET_ALL)
  16. for _ in range(4):
  17. print(Style.DIM + "Please wait...", flush=True)
  18. sleep(1)
  19. print("Finished")
  20. virtualenv_task = callable_virtualenv()

Classic example of using the PythonVirtualenvOperator:

airflow/example_dags/example_python_operator.py[source]

  1. virtual_classic = PythonVirtualenvOperator(
  2. task_id="virtualenv_classic",
  3. requirements="colorama==0.4.0",
  4. python_callable=x,
  5. )

Passing in arguments

Pass extra arguments to the @task.virtualenv decorated function as you would with a normal Python function. Unfortunately, Airflow does not support serializing var, ti and task_instance due to incompatibilities with the underlying library. For Airflow context variables make sure that you either have access to Airflow through setting system_site_packages to True or add apache-airflow to the requirements argument. Otherwise you won’t have access to the most context variables of Airflow in op_kwargs. If you want the context related to datetime objects like data_interval_start you can add pendulum and lazy_object_proxy.

If additional parameters for package installation are needed pass them in requirements.txt as in the example below:

  1. SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
  2. AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

All supported options are listed in the requirements file format.

ExternalPythonOperator

The ExternalPythonOperator can help you to run some of your tasks with a different set of Python libraries than other tasks (and than the main Airflow environment). This might be a virtual environment or any installation of Python that is preinstalled and available in the environment where Airflow task is running. The operator takes Python binary as python parameter. Note, that even in case of virtual environment, the python path should point to the python binary inside the virtual environment (usually in bin subdirectory of the virtual environment). Contrary to regular use of virtual environment, there is no need for activation of the environment. Merely using python binary automatically activates it. In both examples below PATH_TO_PYTHON_BINARY is such a path, pointing to the executable Python binary.

Use the ExternalPythonOperator to execute Python callables inside a pre-defined environment. The virtualenv should be preinstalled in the environment where Python is run. In case dill is used, it has to be preinstalled in the environment (the same version that is installed in main Airflow environment).

TaskFlow example of using the operator:

airflow/example_dags/example_python_operator.py[source]

  1. @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
  2. def callable_external_python():
  3. """
  4. Example function that will be performed in a virtual environment.
  5. Importing at the module level ensures that it will not attempt to import the
  6. library before it is installed.
  7. """
  8. import sys
  9. from time import sleep
  10. print(f"Running task via {sys.executable}")
  11. print("Sleeping")
  12. for _ in range(4):
  13. print("Please wait...", flush=True)
  14. sleep(1)
  15. print("Finished")
  16. external_python_task = callable_external_python()

Classic example of using the operator:

airflow/example_dags/example_python_operator.py[source]

  1. external_classic = ExternalPythonOperator(
  2. task_id="external_python_classic",
  3. python=PATH_TO_PYTHON_BINARY,
  4. python_callable=x,
  5. )

Passing in arguments

Pass extra arguments to the @task.external_python decorated function as you would with a normal Python function. Unfortunately Airflow does not support serializing var and ti / task_instance due to incompatibilities with the underlying library. For Airflow context variables make sure that Airflow is also installed as part of the virtualenv environment in the same version as the Airflow version the task is run on. Otherwise you won’t have access to the most context variables of Airflow in op_kwargs. If you want the context related to datetime objects like data_interval_start you can add pendulum and lazy_object_proxy to your virtualenv.

ShortCircuitOperator

Use the @task.short_circuit decorator to control whether a pipeline continues if a condition is satisfied or a truthy value is obtained.

Warning

The @task.short_circuit decorator is recommended over the classic ShortCircuitOperator to short-circuit pipelines via Python callables.

The evaluation of this condition and truthy value is done via the output of the decorated function. If the decorated function returns True or a truthy value, the pipeline is allowed to continue and an XCom of the output will be pushed. If the output is False or a falsy value, the pipeline will be short-circuited based on the configured short-circuiting (more on this later). In the example below, the tasks that follow the “condition_is_true” task will execute while the tasks downstream of the “condition_is_false” task will be skipped.

airflow/example_dags/example_short_circuit_decorator.py[source]

  1. @task.short_circuit()
  2. def check_condition(condition):
  3. return condition
  4. ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]]
  5. ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]]
  6. condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
  7. condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
  8. chain(condition_is_true, *ds_true)
  9. chain(condition_is_false, *ds_false)

The “short-circuiting” can be configured to either respect or ignore the trigger rule defined for downstream tasks. If ignore_downstream_trigger_rules is set to True, the default configuration, all downstream tasks are skipped without considering the trigger_rule defined for tasks. If this parameter is set to False, the direct downstream tasks are skipped but the specified trigger_rule for other subsequent downstream tasks are respected. In this short-circuiting configuration, the operator assumes the direct downstream task(s) were purposely meant to be skipped but perhaps not other subsequent tasks. This configuration is especially useful if only part of a pipeline should be short-circuited rather than all tasks which follow the short-circuiting task.

In the example below, notice that the “short_circuit” task is configured to respect downstream trigger rules. This means while the tasks that follow the “short_circuit” task will be skipped since the decorated function returns False, “task_7” will still execute as its set to execute when upstream tasks have completed running regardless of status (i.e. the TriggerRule.ALL_DONE trigger rule).

airflow/example_dags/example_short_circuit_decorator.py[source]

  1. [task_1, task_2, task_3, task_4, task_5, task_6] = [
  2. EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
  3. ]
  4. task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
  5. short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
  6. condition=False
  7. )
  8. chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

Passing in arguments

Pass extra arguments to the @task.short_circuit-decorated function as you would with a normal Python function.

Templating

Jinja templating can be used in same way as described for the PythonOperator.

PythonSensor

Sensors can be used in two ways. One is to use the PythonSensor to use arbitrary callable for sensing. The callable should return True when it succeeds, False otherwise. The other uses the Taskflow API utilizing the sensor as a decorator on a function.

airflow/example_dags/example_sensors.py[source]

  1. t8 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)
  2. t9 = PythonSensor(
  3. task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
  4. )

airflow/example_dags/example_sensor_decorator.py[source]

  1. # Using a sensor operator to wait for the upstream data to be ready.
  2. @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
  3. def wait_for_upstream() -> PokeReturnValue:
  4. return PokeReturnValue(is_done=True, xcom_value="xcom_value")