airflow.models.dag
Module Contents
Classes
A dag (directed acyclic graph) is a collection of tasks with directional | |
A tag name per dag, to allow quick filtering in the DAG view. | |
Table defining different owner attributes. For example, a link for an owner that will be passed as | |
Table containing DAG properties | |
DAG context is used to keep the current DAG when DAG is used as ContextManager. |
Functions
create_timetable(interval, timezone) | Create a Timetable instance from a |
get_last_dagrun(dag_id, session[, …]) | Returns the last dag run for a dag, None if there was none. |
get_dataset_triggered_next_run_info(dag_ids, *, session) | Given a list of dag_ids, get string representing how close any that are dataset triggered are |
dag([dag_id, description, schedule, …]) | Python dag decorator. Wraps a function into an Airflow DAG. |
Attributes
airflow.models.dag.log[source]
airflow.models.dag.DEFAULT_VIEW_PRESETS = [‘grid’, ‘graph’, ‘duration’, ‘gantt’, ‘landing_times’][source]
airflow.models.dag.ORIENTATION_PRESETS = [‘LR’, ‘TB’, ‘RL’, ‘BT’][source]
airflow.models.dag.TAG_MAX_LEN = 100[source]
airflow.models.dag.DagStateChangeCallback[source]
airflow.models.dag.ScheduleInterval[source]
airflow.models.dag.ScheduleIntervalArg[source]
airflow.models.dag.ScheduleArg[source]
airflow.models.dag.SLAMissCallback[source]
airflow.models.dag.DEFAULT_SCHEDULE_INTERVAL[source]
exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]
Bases: airflow.exceptions.AirflowException
Exception raised when a model populates data interval fields incorrectly.
The data interval fields should either both be None (for runs scheduled prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is implemented). This is raised if exactly one of the fields is None.
__str__()[source]
Return str(self).
airflow.models.dag.create_timetable(interval, timezone)[source]
Create a Timetable instance from a schedule_interval
argument.
airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[source]
Returns the last dag run for a dag, None if there was none. Last dag run can be any type of run e.g. scheduled or backfilled. Overridden DagRuns are ignored.
airflow.models.dag.get_dataset_triggered_next_run_info(dag_ids, *, session)[source]
Given a list of dag_ids, get string representing how close any that are dataset triggered are their next run, e.g. “1 of 2 datasets updated”
class airflow.models.dag.DAG(dag_id, description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=conf.getint(‘core’, ‘max_active_tasks_per_dag’), max_active_runs=conf.getint(‘core’, ‘max_active_runs_per_dag’), dagrun_timeout=None, sla_miss_callback=None, default_view=conf.get_mandatory_value(‘webserver’, ‘dag_default_view’).lower(), orientation=conf.get_mandatory_value(‘webserver’, ‘dag_orientation’), catchup=conf.getboolean(‘scheduler’, ‘catchup_by_default’), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True)[source]
Bases: airflow.utils.log.logging_mixin.LoggingMixin
A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.
DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.
Note that if you plan to use time zones all the dates provided should be pendulum dates. See Time zone aware DAGs.
New in version 2.4: The schedule argument to specify either time-based scheduling logic (timetable), or dataset-driven triggers.
Deprecated since version 2.4: The arguments schedule_interval and timetable. Their functionalities are merged into the new schedule argument.
Parameters
dag_id (str) – The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII)
description (str | None) – The description for the DAG to e.g. be shown on the webserver
schedule (ScheduleArg) – Defines the rules according to which DAG runs are scheduled. Can accept cron string, timedelta object, Timetable, or list of Dataset objects. See also Customizing DAG Scheduling with Timetables.
start_date (datetime | None) – The timestamp from which the scheduler will attempt to backfill
end_date (datetime | None) – A date beyond which your DAG won’t run, leave to None for open-ended scheduling
template_searchpath (str | Iterable__[str] | None) – This list of folders (non-relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default
template_undefined (type[jinja2.StrictUndefined]) – Template undefined type.
user_defined_macros (dict | None) – a dictionary of macros that will be exposed in your jinja templates. For example, passing
dict(foo='bar')
to this argument allows you to{{ foo }}
in all jinja templates related to this DAG. Note that you can pass any type of object here.user_defined_filters (dict | None) – a dictionary of filters that will be exposed in your jinja templates. For example, passing
dict(hello=lambda name: 'Hello %s' % name)
to this argument allows you to{{ 'world' | hello }}
in all jinja templates related to this DAG.default_args (dict | None) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.
params (collections.abc.MutableMapping | None) – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.
max_active_tasks (int) – the number of task instances allowed to run concurrently
max_active_runs (int) – maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won’t create new active DAG runs
dagrun_timeout (timedelta | None) – specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created.
sla_miss_callback (None | SLAMissCallback | list[SLAMissCallback]) – specify a function or list of functions to call when reporting SLA timeouts. See sla_miss_callback for more information about the function signature and parameters that are passed to the callback.
default_view (str) – Specify DAG default view (grid, graph, duration, gantt, landing_times), default grid
orientation (str) – Specify DAG orientation in graph view (LR, TB, RL, BT), default LR
catchup (bool) – Perform scheduler catchup (or only run latest)? Defaults to True
on_failure_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
on_success_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – Much like the
on_failure_callback
except that it is executed when the dag succeeds.access_control (dict | None) – Specify optional DAG-level actions, e.g., “{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”
is_paused_upon_creation (bool | None) – Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used.
jinja_environment_kwargs (dict | None) –
additional configuration options to be passed to Jinja
Environment
for template renderingExample: to avoid Jinja from removing a trailing newline from template strings
DAG(dag_id='my-dag',
jinja_environment_kwargs={
'keep_trailing_newline': True,
# some other jinja2 Environment options here
}
)
render_template_as_native_obj (bool) – If True, uses a Jinja
NativeEnvironment
to render templates as native Python types. If False, a JinjaEnvironment
is used to render templates as string values.tags (list[str] | None) – List of tags to help filtering DAGs in the UI.
owner_links (dict[str, str] | None) – Dict of owners and their links, that will be clickable on the DAGs view UI. Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. e.g: {“dag_owner”: “https://airflow.apache.org/”}
auto_register (bool) – Automatically register this DAG when it is used in a
with
block
property access_control[source]
property tasks: list[airflow.models.operator.Operator][source]
property task_group: airflow.utils.task_group.TaskGroup[source]
property relative_fileloc: pathlib.Path[source]
File location of the importable dag ‘file’ relative to the configured DAGs folder.
-
Folder location of where the DAG object is instantiated.
-
Return list of all owners found in DAG tasks.
Returns
Comma separated list of owners in DAG tasks
Return type
property concurrency_reached[source]
This attribute is deprecated. Please use airflow.models.DAG.get_concurrency_reached method.
property is_paused[source]
This attribute is deprecated. Please use airflow.models.DAG.get_is_paused method.
property normalized_schedule_interval: ScheduleInterval[source]
property latest_execution_date[source]
This attribute is deprecated. Please use airflow.models.DAG.get_latest_execution_date.
property subdags[source]
Returns a list of the subdag objects associated to this DAG
property roots: list[airflow.models.operator.Operator][source]
Return nodes with no parents. These are first to execute and are called roots or root nodes.
property leaves: list[airflow.models.operator.Operator][source]
Return nodes with no children. These are last to execute and are called leaves or leaf nodes.
property task: airflow.decorators.TaskDecoratorCollection[source]
-
File path that needs to be imported to load this DAG or subdag.
This may not be an actual file on disk in the case when this DAG is loaded from a ZIP file or other DAG distribution format.
get_doc_md(doc_md)[source]
validate()[source]
Validate the DAG has a coherent setup.
This is called by the DAG bag before bagging the DAG.
__repr__()[source]
Return repr(self).
__eq__(other)[source]
Return self==value.
__ne__(other)[source]
Return self!=value.
__lt__(other)[source]
Return self<value.
__hash__()[source]
Return hash(self).
__enter__()[source]
__exit__(_type, _value, _tb)[source]
date_range(start_date, num=None, end_date=None)[source]
is_fixed_time_schedule()[source]
following_schedule(dttm)[source]
Calculates the following schedule for this dag in UTC.
Parameters
dttm – utc datetime
Returns
utc datetime
previous_schedule(dttm)[source]
get_next_data_interval(dag_model)[source]
Get the data interval of the next scheduled run.
For compatibility, this method infers the data interval from the DAG’s schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended on as a part of the Python API.
get_run_data_interval(run)[source]
Get the data interval of this run.
For compatibility, this method infers the data interval from the DAG’s schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended on as a part of the Python API.
infer_automated_data_interval(logical_date)[source]
Infer a data interval for a run against this DAG.
This method is used to bridge runs created prior to AIP-39 implementation, which do not have an explicit data interval. Therefore, this method only considers
schedule_interval
values valid prior to Airflow 2.2.DO NOT use this method is there is a known data interval.
next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]
Get information about the next DagRun of this dag after
date_last_automated_dagrun
.This calculates what time interval the next DagRun should operate on (its execution date) and when it can be scheduled, according to the dag’s timetable, start_date, end_date, etc. This doesn’t check max active run or any other “max_active_tasks” type limits, but only performs calculations based on the various date and interval fields of this dag and its tasks.
Parameters
last_automated_dagrun (None | datetime | DataInterval) – The
max(execution_date)
of existing “automated” DagRuns for this dag (scheduled or backfill, but not manual).restricted (bool) – If set to False (default is True), ignore
start_date
,end_date
, andcatchup
specified on the DAG or tasks.
Returns
DagRunInfo of the next dagrun, or None if a dagrun is not going to be scheduled.
Return type
DagRunInfo | None
next_dagrun_after_date(date_last_automated_dagrun)[source]
iter_dagrun_infos_between(earliest, latest, *, align=True)[source]
Yield DagRunInfo using this DAG’s timetable between given interval.
DagRunInfo instances yielded if their
logical_date
is not earlier thanearliest
, nor later thanlatest
. The instances are ordered by theirlogical_date
from earliest to latest.If
align
isFalse
, the first run will happen immediately onearliest
, even if it does not fall on the logical timetable schedule. The default isTrue
, but subdags will ignore this value and always behave as if this is set toFalse
for backward compatibility.Example: A DAG is scheduled to run every midnight (
0 0 * * *
). Ifearliest
is2021-06-03 23:00:00
, the first DagRunInfo would be2021-06-03 23:00:00
ifalign=False
, and2021-06-04 00:00:00
ifalign=True
.get_run_dates(start_date, end_date=None)[source]
Returns a list of dates between the interval received as parameter using this dag’s schedule interval. Returned dates can be used for execution dates.
Parameters
start_date – The start date of the interval.
end_date – The end date of the interval. Defaults to
timezone.utcnow()
.
Returns
A list of dates within the interval following the dag’s schedule.
Return type
normalize_schedule(dttm)[source]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
has_dag_runs(session=NEW_SESSION, include_externally_triggered=True)[source]
param(name, default=NOTSET)[source]
Return a DagParam object for current dag.
Parameters
name (str) – dag parameter name.
default (Any) – fallback value for dag parameter.
Returns
DagParam instance for specified name and current dag.
Return type
get_concurrency_reached(session=NEW_SESSION)[source]
Returns a boolean indicating whether the max_active_tasks limit for this DAG has been reached
get_is_active(session=NEW_SESSION)[source]
Returns a boolean indicating whether this DAG is active
get_is_paused(session=NEW_SESSION)[source]
Returns a boolean indicating whether this DAG is paused
handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]
Triggers the appropriate callback depending on the value of success, namely the on_failure_callback or on_success_callback. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a ‘reason’, primarily to differentiate DagRun failures.
Parameters
dagrun – DagRun object
success – Flag to specify if failure or success callback should be called
reason – Completion reason
session – Database session
get_active_runs()[source]
Returns a list of dag run execution dates currently running
Returns
List of execution dates
get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION)[source]
Returns the number of active “running” dag runs
Parameters
external_trigger – True for externally triggered active dag runs
session –
Returns
number greater than 0 for active dag runs
get_dagrun(execution_date=None, run_id=None, session=NEW_SESSION)[source]
Returns the dag run for a given execution date or run_id if it exists, otherwise none.
Parameters
execution_date (datetime | None) – The execution date of the DagRun to find.
run_id (str | None) – The run_id of the DagRun to find.
session (sqlalchemy.orm.session.Session) –
Returns
The DagRun if found, otherwise None.
get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]
Returns the list of dag runs between start_date (inclusive) and end_date (inclusive).
Parameters
start_date – The starting execution date of the DagRun to find.
end_date – The ending execution date of the DagRun to find.
session –
Returns
The list of DagRuns found.
get_latest_execution_date(session=NEW_SESSION)[source]
Returns the latest date for which at least one dag run exists
resolve_template_files()[source]
get_template_env(*, force_sandboxed=False)[source]
Build a Jinja2 environment.
set_dependency(upstream_task_id, downstream_task_id)[source]
Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task()
get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]
Get
num
task instances before (including)base_date
.The returned list may contain exactly
num
task instances corresponding to any DagRunType. It can have less if there are less thannum
scheduled DAG runs beforebase_date
.get_task_instances(start_date=None, end_date=None, state=None, session=NEW_SESSION)[source]
set_task_instance_state(*, task_id, map_indexes=None, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]
Set the state of a TaskInstance to the given state, and clear its downstream tasks that are in failed or upstream_failed state.
Parameters
task_id (str) – Task ID of the TaskInstance
map_indexes (Collection__[int] | None) – Only set TaskInstance if its map_index matches. If None (default), all mapped TaskInstances of the task are set.
execution_date (datetime | None) – Execution date of the TaskInstance
run_id (str | None) – The run_id of the TaskInstance
state (airflow.utils.state.TaskInstanceState) – State to set the TaskInstance to
upstream (bool) – Include all upstream tasks of the given task_id
downstream (bool) – Include all downstream tasks of the given task_id
future (bool) – Include all future TaskInstances of the given task_id
commit (bool) – Commit changes
past (bool) – Include all past TaskInstances of the given task_id
topological_sort(include_subdag_tasks=False)[source]
Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies.
Deprecated in place of
task_group.topological_sort
set_dag_runs_state(state=State.RUNNING, session=NEW_SESSION, start_date=None, end_date=None, dag_ids=[])[source]
clear(task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state=DagRunState.QUEUED, dry_run=False, session=NEW_SESSION, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids=frozenset())[source]
Clears a set of task instances associated with the current dag for a specified date range.
Parameters
task_ids (Collection__[str | tuple[str, int]__] | None) – List of task ids or (
task_id
,map_index
) tuples to clearstart_date (datetime | None) – The minimum execution_date to clear
end_date (datetime | None) – The maximum execution_date to clear
only_failed (bool) – Only clear failed tasks
only_running (bool) – Only clear running tasks.
confirm_prompt (bool) – Ask for confirmation
include_subdags (bool) – Clear tasks in subdags and clear external tasks indicated by ExternalTaskMarker
include_parentdag (bool) – Clear tasks in the parent dag of the subdag.
dag_run_state (airflow.utils.state.DagRunState) – state to set DagRun to. If set to False, dagrun state will not be changed.
dry_run (bool) – Find the tasks to clear but don’t clear them.
session (sqlalchemy.orm.session.Session) – The sqlalchemy session to use
dag_bag (DagBag | None) – The DagBag used to find the dags subdags (Optional)
exclude_task_ids (frozenset[str] | frozenset[tuple[str, int]__] | None) – A set of
task_id
or (task_id
,map_index
) tuples that should not be cleared
classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]
__deepcopy__(memo)[source]
sub_dag(*args, **kwargs)[source]
This method is deprecated in favor of partial_subset
partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]
Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed.
Parameters
task_ids_or_regex (str | re.Pattern | Iterable__[str]) – Either a list of task_ids, or a regex to match against task ids (as a string, or compiled regex pattern).
include_downstream – Include all downstream tasks of matched tasks, in addition to matched tasks.
include_upstream – Include all upstream tasks of matched tasks, in addition to matched tasks.
include_direct_upstream – Include all tasks directly upstream of matched and downstream (if include_downstream = True) tasks
has_task(task_id)[source]
has_task_group(task_group_id)[source]
task_group_dict()[source]
get_task(task_id, include_subdags=False)[source]
pickle_info()[source]
pickle(session=NEW_SESSION)[source]
tree_view()[source]
Print an ASCII tree representation of the DAG.
add_task(task)[source]
Add a task to the DAG
Parameters
task (airflow.models.operator.Operator) – the task you want to add
add_tasks(tasks)[source]
Add a list of tasks to the DAG
Parameters
tasks (Iterable[airflow.models.operator.Operator__]) – a lit of tasks you want to add
run(start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean(‘core’, ‘donot_pickle’), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False, disable_retry=False)[source]
Runs the DAG.
Parameters
start_date – the start date of the range to run
end_date – the end date of the range to run
mark_success – True to mark jobs as succeeded without running them
local – True to run the tasks using the LocalExecutor
executor – The executor instance to run the tasks
donot_pickle – True to avoid pickling DAG object and send to workers
ignore_task_deps – True to skip upstream tasks
ignore_first_depends_on_past – True to ignore depends_on_past dependencies for the first set of tasks only
pool – Resource pool to use
delay_on_limit_secs – Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached
verbose – Make logging output more verbose
conf – user defined dictionary passed from CLI
rerun_failed_tasks –
run_backwards –
run_at_least_once – If true, always run the DAG at least once even if no logical run exists within the time range.
cli()[source]
Exposes a CLI specific to this DAG
test(execution_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, session=NEW_SESSION)[source]
Execute one single DagRun for a given DAG and execution date.
Parameters
execution_date (datetime | None) – execution date for the DAG run
run_conf (dict[str, Any__] | None) – configuration to pass to newly created dagrun
conn_file_path (str | None) – file path to a connection file in either yaml or json
variable_file_path (str | None) – file path to a variable file in either yaml or json
session (sqlalchemy.orm.session.Session) – database connection (optional)
create_dagrun(state, execution_date=None, run_id=None, start_date=None, external_trigger=False, conf=None, run_type=None, session=NEW_SESSION, dag_hash=None, creating_job_id=None, data_interval=None)[source]
Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.
Parameters
run_id (str | None) – defines the run id for this dag run
run_type (DagRunType | None) – type of DagRun
execution_date (datetime | None) – the execution date of this dag run
state (airflow.utils.state.DagRunState) – the state of the dag run
start_date (datetime | None) – the date this dag run should be evaluated
external_trigger (bool | None) – whether this dag run is externally triggered
conf (dict | None) – Dict containing configuration/parameters to pass to the DAG
creating_job_id (int | None) – id of the job creating this DagRun
session (sqlalchemy.orm.session.Session) – database session
dag_hash (str | None) – Hash of Serialized DAG
data_interval (tuple[datetime, datetime__] | None) – Data interval of the DagRun
classmethod bulk_sync_to_db(dags, session=NEW_SESSION)[source]
This method is deprecated in favor of bulk_write_to_db
classmethod bulk_write_to_db(dags, processor_subdir=None, session=NEW_SESSION)[source]
Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including calculated fields.
Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
Parameters
dags (Collection__[DAG]) – the DAG objects to save to the DB
Returns
None
sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]
Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
Returns
None
get_default_view()[source]
This is only there for backward compatible jinja2 templates
static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]
Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM
Parameters
active_dag_ids – list of DAG IDs that are active
Returns
None
static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]
Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted.
Parameters
expiration_date – set inactive DAGs that were touched before this time
Returns
None
static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]
Returns the number of task instances in the given DAG.
Parameters
session – ORM session
dag_id – ID of the DAG to get the task concurrency of
run_id – ID of the DAG run to get the task concurrency of
task_ids – A list of valid task IDs for the given DAG
states – A list of states to filter by if supplied
Returns
The number of running tasks
Return type
classmethod get_serialized_fields()[source]
Stringified DAGs and operators contain exactly these fields.
get_edge_info(upstream_task_id, downstream_task_id)[source]
Returns edge information for the given pair of tasks if present, and an empty edge if there is no information.
set_edge_info(upstream_task_id, downstream_task_id, info)[source]
Sets the given edge information on the DAG. Note that this will overwrite, rather than merge with, existing info.
validate_schedule_and_params()[source]
Validates & raise exception if there are any Params in the DAG which neither have a default value nor have the null in schema[‘type’] list, but the DAG have a schedule_interval which is not None.
iter_invalid_owner_links()[source]
Parses a given link, and verifies if it’s a valid URL, or a ‘mailto’ link. Returns an iterator of invalid (owner, link) pairs.
class airflow.models.dag.DagTag[source]
Bases: airflow.models.base.Base
A tag name per dag, to allow quick filtering in the DAG view.
class airflow.models.dag.DagOwnerAttributes[source]
Bases: airflow.models.base.Base
Table defining different owner attributes. For example, a link for an owner that will be passed as a hyperlink to the DAGs view
__tablename__ = ‘dag_owner_attributes’[source]
dag_id[source]
owner[source]
link[source]
__repr__()[source]
classmethod get_all(session)[source]
class airflow.models.dag.DagModel(concurrency=None, **kwargs)[source]
Bases: airflow.models.base.Base
Table containing DAG properties
property next_dagrun_data_interval: DataInterval | None[source]
property timezone[source]
property safe_dag_id[source]
property relative_fileloc: pathlib.Path | None[source]
File location of the importable dag ‘file’ relative to the configured DAGs folder.
__tablename__ = ‘dag’[source]
These items are stored in the database for state related information
dag_id[source]
root_dag_id[source]
is_paused_at_creation[source]
is_paused[source]
is_subdag[source]
is_active[source]
last_parsed_time[source]
last_pickled[source]
last_expired[source]
scheduler_lock[source]
pickle_id[source]
fileloc[source]
processor_subdir[source]
owners[source]
description[source]
default_view[source]
schedule_interval[source]
timetable_description[source]
tags[source]
dag_owner_links[source]
max_active_tasks[source]
max_active_runs[source]
has_task_concurrency_limits[source]
has_import_errors[source]
next_dagrun[source]
next_dagrun_data_interval_start[source]
next_dagrun_data_interval_end[source]
next_dagrun_create_after[source]
__table_args__ = ()[source]
parent_dag[source]
schedule_dataset_references[source]
schedule_datasets[source]
task_outlet_dataset_references[source]
NUM_DAGS_PER_DAGRUN_QUERY[source]
__repr__()[source]
static get_dagmodel(dag_id, session=NEW_SESSION)[source]
classmethod get_current(dag_id, session=NEW_SESSION)[source]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
get_is_paused(*, session=None)[source]
Provide interface compatibility to ‘DAG’.
static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]
Given a list of dag_ids, get a set of Paused Dag Ids
Parameters
session (sqlalchemy.orm.session.Session) – ORM Session
Returns
Paused Dag_ids
Return type
get_default_view()[source]
Get the Default DAG View, returns the default config value if DagModel does not have a value
set_is_paused(is_paused, including_subdags=True, session=NEW_SESSION)[source]
Pause/Un-pause a DAG.
classmethod deactivate_deleted_dags(alive_dag_filelocs, session=NEW_SESSION)[source]
Set
is_active=False
on the DAGs for which the DAG files have been removed.
classmethod dags_needing_dagruns(session)[source]
Return (and lock) a list of Dag objects that are due to create a new DagRun.
This will return a resultset of rows that is row-level-locked with a “SELECT … FOR UPDATE” query, you should ensure that any scheduling decisions are made in a single transaction – as soon as the transaction is committed it will be unlocked.
calculate_dagrun_date_fields(dag, most_recent_dag_run)[source]
Calculate
next_dagrun
and next_dagrun_create_after`Parameters
dag (DAG) – The DAG object
most_recent_dag_run (None | datetime | DataInterval) – DataInterval (or datetime) of most recent run of this dag, or none if not yet scheduled.
- get_dataset_triggered_next_run_info(*, session=NEW_SESSION)[source]
airflow.models.dag.dag(dag_id=’’, description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=conf.getint(‘core’, ‘max_active_tasks_per_dag’), max_active_runs=conf.getint(‘core’, ‘max_active_runs_per_dag’), dagrun_timeout=None, sla_miss_callback=None, default_view=conf.get_mandatory_value(‘webserver’, ‘dag_default_view’).lower(), orientation=conf.get_mandatory_value(‘webserver’, ‘dag_orientation’), catchup=conf.getboolean(‘scheduler’, ‘catchup_by_default’), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True)[source]
Python dag decorator. Wraps a function into an Airflow DAG. Accepts kwargs for operator kwarg. Can be used to parameterize DAGs.
Parameters
dag_args – Arguments for DAG object
dag_kwargs – Kwargs for DAG object.
class airflow.models.dag.DagContext[source]
DAG context is used to keep the current DAG when DAG is used as ContextManager.
You can use DAG as context:
with DAG(
dag_id="example_dag",
default_args=default_args,
schedule="0 0 * * *",
dagrun_timeout=timedelta(minutes=60),
) as dag:
...
If you do this the context stores the DAG and whenever new task is created, it will use such stored DAG as the parent DAG.