airflow.models.taskinstance
Module Contents
Classes
Enum to signal manner of exit for task run command. | |
Task instances store the state of a task instance. This table is the | |
Simplified Task Instance. | |
For storage of arbitrary notes concerning the task instance. |
Functions
set_current_context(context) | Sets the current execution context to the provided context object. |
clear_task_instances(tis, session[, …]) | Clears a set of task instances, but makes sure the running ones |
Attributes
airflow.models.taskinstance.TR[source]
airflow.models.taskinstance.log[source]
airflow.models.taskinstance.hybrid_property[source]
airflow.models.taskinstance.PAST_DEPENDS_MET = ‘past_depends_met’[source]
class airflow.models.taskinstance.TaskReturnCode[source]
Bases: enum.Enum
Enum to signal manner of exit for task run command.
DEFERRED = 100[source]
When task exits with deferral to trigger.
airflow.models.taskinstance.set_current_context(context)[source]
Sets the current execution context to the provided context object. This method should be called once per Task execution, before calling operator.execute.
airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state=DagRunState.QUEUED)[source]
Clears a set of task instances, but makes sure the running ones get killed. Also sets Dagrun’s state to QUEUED and start_date to the time of execution. But only for finished DRs (SUCCESS and FAILED). Doesn’t clear DR’s state and start_date`for running DRs (QUEUED and RUNNING) because clearing the state for already running DR is redundant and clearing `start_date affects DR’s duration.
Parameters
tis (list[TaskInstance]) – a list of task instances
session (sqlalchemy.orm.session.Session) – current session
dag_run_state (DagRunState | Literal[False__]) – state to set finished DagRuns to. If set to False, DagRuns state will not be changed.
dag (DAG | None) – DAG object
activate_dag_runs (None) – Deprecated parameter, do not pass
class airflow.models.taskinstance.TaskInstance(task, execution_date=None, run_id=None, state=None, map_index=-1)[source]
Bases: airflow.models.base.Base
, airflow.utils.log.logging_mixin.LoggingMixin
Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the state they are in.
The SqlAlchemy model doesn’t have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions.
Database transactions on this table should insure double triggers and any confusion around what task instances are or aren’t ready to run even while multiple schedulers may be firing task instances.
A value of -1 in map_index represents any of: a TI without mapped tasks; a TI with mapped tasks that has yet to be expanded (state=pending); a TI with mapped tasks that expanded to an empty list (state=skipped).
property try_number[source]
Return the try number that this task number will be when it is actually run.
If the TaskInstance is currently running, this will match the column in the database, in all other cases this will be incremented.
property prev_attempted_tries: int[source]
Based on this instance’s try_number, this will calculate the number of previously attempted tries, defaulting to 0.
-
Log URL for TaskInstance
property mark_success_url: str[source]
URL to mark TI success
property key: airflow.models.taskinstancekey.TaskInstanceKey[source]
Returns a tuple that identifies the task instance uniquely
property is_premature: bool[source]
Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed.
property previous_ti: TaskInstance | None[source]
This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_ti method.
property previous_ti_success: TaskInstance | None[source]
This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_ti method.
property previous_start_date_success: pendulum.DateTime | None[source]
This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_start_date method.
__tablename__ = ‘task_instance’[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
start_date[source]
end_date[source]
duration[source]
state[source]
max_tries[source]
hostname[source]
unixname[source]
job_id[source]
pool[source]
pool_slots[source]
queue[source]
priority_weight[source]
operator[source]
queued_dttm[source]
queued_by_job_id[source]
pid[source]
executor_config[source]
updated_at[source]
external_executor_id[source]
trigger_id[source]
trigger_timeout[source]
next_method[source]
next_kwargs[source]
__table_args__ = ()[source]
dag_model[source]
trigger[source]
triggerer_job[source]
dag_run[source]
rendered_task_instance_fields[source]
execution_date[source]
task_instance_note[source]
note[source]
task: airflow.models.operator.Operator[source]
is_trigger_log_context: bool = False[source]
Indicate to FileTaskHandler that logging context should be set up for trigger logging.
static insert_mapping(run_id, task, map_index)[source]
init_on_load()[source]
Initialize the attributes that aren’t stored in the DB
command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]
Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator.
static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None, map_index=-1)[source]
Generates the shell command required to execute this task instance.
Parameters
dag_id (str) – DAG ID
task_id (str) – Task ID
run_id (str) – The run_id of this task’s DagRun
mark_success (bool) – Whether to mark the task as successful
ignore_all_deps (bool) – Ignore all ignorable dependencies. Overrides the other ignore_* parameters.
ignore_depends_on_past (bool) – Ignore depends_on_past parameter of DAGs (e.g. for Backfills)
wait_for_past_depends_before_skipping (bool) – Wait for past depends before marking the ti as skipped
ignore_task_deps (bool) – Ignore task-specific dependencies such as depends_on_past and trigger rule
ignore_ti_state (bool) – Ignore the task instance’s previous failure/success
local (bool) – Whether to run the task locally
pickle_id (int | None) – If the DAG was serialized to the DB, the ID associated with the pickled DAG
file_path (PurePath | str | None) – path to the file containing the DAG definition
raw (bool) – raw mode (needs more details)
job_id (str | None) – job ID (needs more details)
pool (str | None) – the Airflow pool that the task should run in
cfg_path (str | None) – the Path to the configuration file
Returns
shell command that can be used to run the task instance
Return type
current_state(session=NEW_SESSION)[source]
Get the very latest state from the database, if a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used.
sqlalchemy.inspect is used here to get the primary keys ensuring that if they change it will not regress
Parameters
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
error(session=NEW_SESSION)[source]
Forces the task instance’s state to FAILED in the database.
Parameters
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
refresh_from_db(session=NEW_SESSION, lock_for_update=False)[source]
Refreshes the task instance from the database based on the primary key
Parameters
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
lock_for_update (bool) – if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed.
refresh_from_task(task, pool_override=None)[source]
Copy common attributes from the given task.
Parameters
task (airflow.models.operator.Operator) – The task object to copy from
pool_override (str | None) – Use the pool_override instead of task’s pool
clear_xcom_data(session=NEW_SESSION)[source]
Clear all XCom data from the database for the task instance.
If the task is unmapped, all XComs matching this task ID in the same DAG run are removed. If the task is mapped, only the one with matching map index is removed.
Parameters
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
set_state(state, session=NEW_SESSION)[source]
Set TaskInstance state.
Parameters
state (str | None) – State to set for the TI
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
Returns
Was the state changed
Return type
are_dependents_done(session=NEW_SESSION)[source]
Checks whether the immediate dependents of this task instance have succeeded or have been skipped. This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table.
Parameters
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
get_previous_dagrun(state=None, session=None)[source]
The DagRun that ran before this task instance’s DagRun.
Parameters
state (DagRunState | None) – If passed, it only take into account instances of a specific state.
session (Session | None) – SQLAlchemy ORM Session.
get_previous_ti(state=None, session=NEW_SESSION)[source]
The task instance for the task that ran before this task instance.
Parameters
state (DagRunState | None) – If passed, it only take into account instances of a specific state.
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
get_previous_execution_date(state=None, session=NEW_SESSION)[source]
The execution date from property previous_ti_success.
Parameters
state (DagRunState | None) – If passed, it only take into account instances of a specific state.
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
get_previous_start_date(state=None, session=NEW_SESSION)[source]
The start date from property previous_ti_success.
Parameters
state (DagRunState | None) – If passed, it only take into account instances of a specific state.
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]
Returns whether or not all the conditions are met for this task instance to be run given the context for the dependencies (e.g. a task instance being force run from the UI will ignore some dependencies).
Parameters
dep_context (DepContext | None) – The execution context that determines the dependencies that should be evaluated.
session (sqlalchemy.orm.session.Session) – database session
verbose (bool) – whether log details on failed dependencies on info or debug log level
get_failed_dep_statuses(dep_context=None, session=NEW_SESSION)[source]
Get failed Dependencies
__repr__()[source]
Return repr(self).
next_retry_datetime()[source]
Get datetime of the next retry if the task instance fails. For exponential backoff, retry_delay is used as base and will be converted to seconds.
ready_for_retry()[source]
Checks on whether the task instance is in the right state and timeframe to be retried.
get_dagrun(session=NEW_SESSION)[source]
Returns the DagRun for this TaskInstance
Parameters
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
Returns
DagRun
Return type
airflow.models.dagrun.DagRun
check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, external_executor_id=None, session=NEW_SESSION)[source]
Checks dependencies and then sets state to RUNNING if they are met. Returns True if and only if state is set to RUNNING, which implies that task should be executed, in preparation for _run_raw_task
Parameters
verbose (bool) – whether to turn on more verbose logging
ignore_all_deps (bool) – Ignore all of the non-critical dependencies, just runs
ignore_depends_on_past (bool) – Ignore depends_on_past DAG attribute
wait_for_past_depends_before_skipping (bool) – Wait for past depends before mark the ti as skipped
ignore_task_deps (bool) – Don’t check the dependencies of this TaskInstance’s task
ignore_ti_state (bool) – Disregards previous task instance state
mark_success (bool) – Don’t run the task, mark its state as success
test_mode (bool) – Doesn’t record success or failure in the DB
job_id (str | None) – Job (BackfillJob / LocalTaskJob / SchedulerJob) ID
pool (str | None) – specifies the pool to use to run the task instance
external_executor_id (str | None) – The identifier of the celery executor
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
Returns
whether the state was changed to running or not
Return type
clear_next_method_args()[source]
run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=NEW_SESSION)[source]
Run TaskInstance
dry_run()[source]
Only Renders Templates for the TI
static get_truncated_error_traceback(error, truncate_to)[source]
Truncates the traceback of an exception to the first frame called from within a given function
Parameters
error (BaseException) – exception to get traceback from
truncate_to (Callable) – Function to truncate TB to. Must have a
__code__
attribute
handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]
Handle Failure for the TaskInstance
is_eligible_to_retry()[source]
Is task instance is eligible for retry
get_template_context(session=None, ignore_param_exceptions=True)[source]
Return TI Context
get_rendered_template_fields(session=NEW_SESSION)[source]
Update task with rendered template fields for presentation in UI. If task has already run, will fetch from DB; otherwise will render.
get_rendered_k8s_spec(session=NEW_SESSION)[source]
Fetch rendered template fields from DB
overwrite_params_with_dag_run_conf(params, dag_run)[source]
Overwrite Task Params with DagRun.conf
render_templates(context=None)[source]
Render templates in the operator fields.
If the task was originally mapped, this may replace
self.task
with the unmapped, fully rendered BaseOperator. The originalself.task
before replacement is returned.render_k8s_pod_yaml()[source]
Render k8s pod yaml
get_email_subject_content(exception, task=None)[source]
Get the email subject content for exceptions.
email_alert(exception, task)[source]
Send alert email with exception information.
set_duration()[source]
Set TI duration
xcom_push(key, value, execution_date=None, session=NEW_SESSION)[source]
Make an XCom available for tasks to pull.
Parameters
key (str) – Key to store the value under.
value (Any) – Value to store. What types are possible depends on whether
enable_xcom_pickling
is true or not. If so, this can be any picklable object; only be JSON-serializable may be used otherwise.execution_date (datetime | None) – Deprecated parameter that has no effect.
xcom_pull(task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False, session=NEW_SESSION, *, map_indexes=None, default=None)[source]
Pull XComs that optionally meet certain criteria.
Parameters
key (str) – A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is
'return_value'
, also available as constantXCOM_RETURN_KEY
. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass None.task_ids (str | Iterable__[str] | None) – Only XComs from tasks with matching ids will be pulled. Pass None to remove the filter.
dag_id (str | None) – If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used.
map_indexes (int | Iterable__[int] | None) – If provided, only pull XComs with matching indexes. If None (default), this is inferred from the task(s) being pulled (see below for details).
include_prior_dates (bool) – If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.
When pulling one single task (`task_id` is _None_ or a str) without specifying `map_indexes`, the return value is inferred from whether the specified task is mapped. If not, value from the one single task instance is returned. If the task to pull is mapped, an iterator (not a list) yielding XComs from mapped task instances is returned. In either case, `default` (_None_ if not specified) is returned if no matching XComs are found.
When pulling multiple tasks (i.e. either `task_id` or `map_index` is a non-str iterable), a list of matching XComs is returned. Elements in the list is ordered by item ordering in `task_id` and `map_index`.
get_num_running_task_instances(session, same_dagrun=False)[source]
Return Number of running TIs from the DB
init_run_context(raw=False)[source]
Sets the log context.
static filter_for_tis(tis)[source]
Returns SQLAlchemy filter to query selected task instances
classmethod ti_selector_condition(vals)[source]
Build an SQLAlchemy filter for a list where each element can contain whether a task_id, or a tuple of (task_id,map_index)
schedule_downstream_tasks(session=NEW_SESSION)[source]
The mini-scheduler for scheduling downstream tasks of this task instance :meta: private
get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]
Infer the map indexes of an upstream “relevant” to this ti.
The bulk of the logic mainly exists to solve the problem described by the following example, where ‘val’ must resolve to different values, depending on where the reference is being used:
@task
def this_task(v): # This is self.task.
return v * 2
@task_group
def tg1(inp):
val = upstream(inp) # This is the upstream task.
this_task(val) # When inp is 1, val here should resolve to 2.
return val
# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])
@task_group
def tg2(inp):
another_task(inp, val) # val here should resolve to [2, 4, 6].
tg2.expand(inp=["a", "b"])
The surrounding mapped task groups of
upstream
andself.task
are inspected to find a common “ancestor”. If such an ancestor is found, we need to return specific map indexes to pull a partial value from upstream XCom.Parameters
upstream (airflow.models.operator.Operator) – The referenced upstream task.
ti_count (int | None) – The total count of task instance this task was expanded by the scheduler, i.e.
expanded_ti_count
in the template context.
Returns
Specific map index or map indexes to pull, or
None
if we want to “whole” return value (i.e. no mapped task groups involved).Return type
clear_db_references(session)[source]
Clear DB references to XCom, TaskFail and RenderedTaskInstanceFields.
Parameters
session – ORM Session
airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor_config, pool, queue, key, run_as_user=None, priority_weight=None)[source]
Simplified Task Instance.
Used to send data between processes via Queues.
__eq__(other)[source]
Return self==value.
as_dict()[source]
classmethod from_ti(ti)[source]
classmethod from_dict(obj_dict)[source]
class airflow.models.taskinstance.TaskInstanceNote(content, user_id=None)[source]
Bases: airflow.models.base.Base
For storage of arbitrary notes concerning the task instance.