Asynchronous I/O (asyncio)
Support for Python asyncio. Support for Core and ORM usage is included, using asyncio-compatible dialects.
New in version 1.4.
The asyncio extension requires at least Python version 3.6.
Note
The asyncio should be regarded as alpha level for the 1.4 release of SQLAlchemy. API details are subject to change at any time.
See also
Asynchronous IO Support for Core and ORM - initial feature announcement
Asyncio Integration - example scripts illustrating working examples of Core and ORM use within the asyncio extension.
Synopsis - Core
For Core use, the create_async_engine()
function creates an instance of AsyncEngine
which then offers an async version of the traditional Engine
API. The AsyncEngine
delivers an AsyncConnection
via its AsyncEngine.connect()
and AsyncEngine.begin()
methods which both deliver asynchronous context managers. The AsyncConnection
can then invoke statements using either the AsyncConnection.execute()
method to deliver a buffered Result
, or the AsyncConnection.stream()
method to deliver a streaming server-side AsyncResult
:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(meta.drop_all)
await conn.run_sync(meta.create_all)
await conn.execute(
t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
)
async with engine.connect() as conn:
# select a Result, which will be delivered with buffered
# results
result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
print(result.fetchall())
asyncio.run(async_main())
Above, the AsyncConnection.run_sync()
method may be used to invoke special DDL functions such as MetaData.create_all()
that don’t include an awaitable hook.
The AsyncConnection
also features a “streaming” API via the AsyncConnection.stream()
method that returns an AsyncResult
object. This result object uses a server-side cursor and provides an async/await API, such as an async iterator:
async with engine.connect() as conn:
async_result = await conn.stream(select(t1))
async for row in async_result:
print("row: %s" % (row, ))
Synopsis - ORM
Using 2.0 style querying, the AsyncSession
class provides full ORM functionality. Within the default mode of use, special care must be taken to avoid lazy loading or other expired-attribute access involving ORM relationships and column attributes; the next section Preventing Implicit IO when Using AsyncSession details this. The example below illustrates a complete example including mapper and session configuration:
import asyncio
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String)
create_date = Column(DateTime, server_default=func.now())
bs = relationship("B")
# required in order to access columns with server defaults
# or SQL expression defaults, subsequent to a flush, without
# triggering an expired load
__mapper_args__ = {"eager_defaults": True}
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(ForeignKey("a.id"))
data = Column(String)
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
# expire_on_commit=False will prevent attributes from being expired
# after commit.
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
async with async_session() as session:
async with session.begin():
session.add_all(
[
A(bs=[B(), B()], data="a1"),
A(bs=[B()], data="a2"),
A(bs=[B(), B()], data="a3"),
]
)
stmt = select(A).options(selectinload(A.bs))
result = await session.execute(stmt)
for a1 in result.scalars():
print(a1)
print(f"created at: {a1.create_date}")
for b1 in a1.bs:
print(b1)
result = await session.execute(select(A).order_by(A.id))
a1 = result.scalars().first()
a1.data = "new data"
await session.commit()
# access attribute subsequent to commit; this is what
# expire_on_commit=False allows
print(a1.data)
asyncio.run(async_main())
In the example above, the AsyncSession
is instantiated using the optional sessionmaker
helper, and associated with an AsyncEngine
against particular database URL. It is then used in a Python asynchronous context manager (i.e. async with:
statement) so that it is automatically closed at the end of the block; this is equivalent to calling the AsyncSession.close()
method.
Preventing Implicit IO when Using AsyncSession
Using traditional asyncio, the application needs to avoid any points at which IO-on-attribute access may occur. Above, the following measures are taken to prevent this:
The
selectinload()
eager loader is employed in order to eagerly load theA.bs
collection within the scope of theawait session.execute()
call:stmt = select(A).options(selectinload(A.bs))
If the default loader strategy of “lazyload” were left in place, the access of the
A.bs
attribute would raise an asyncio exception. There are a variety of ORM loader options available, which may be configured at the default mapping level or used on a per-query basis, documented at Relationship Loading Techniques.The
AsyncSession
is configured usingSession.expire_on_commit
set to False, so that we may access attributes on an object subsequent to a call toAsyncSession.commit()
, as in the line at the end where we access an attribute:# create AsyncSession with expire_on_commit=False
async_session = AsyncSession(engine, expire_on_commit=False)
# sessionmaker version
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
async with async_session() as session:
result = await session.execute(select(A).order_by(A.id))
a1 = result.scalars().first()
# commit would normally expire all attributes
await session.commit()
# access attribute subsequent to commit; this is what
# expire_on_commit=False allows
print(a1.data)
The
Column.server_default
value on thecreated_at
column will not be refreshed by default after an INSERT; instead, it is normally expired so that it can be loaded when needed. Similar behavior applies to a column where theColumn.default
parameter is assigned to a SQL expression object. To access this value with asyncio, it has to be refreshed within the flush process, which is achieved by setting themapper.eager_defaults
parameter on the mapping:class A(Base):
# ...
# column with a server_default, or SQL expression default
create_date = Column(DateTime, server_default=func.now())
# add this so that it can be accessed
__mapper_args__ = {"eager_defaults": True}
Other guidelines include:
Methods like
AsyncSession.expire()
should be avoided in favor ofAsyncSession.refresh()
Appropriate loader options should be employed for
deferred()
columns, if used at all, in addition to that ofrelationship()
constructs as noted above. See Deferred Column Loading for background on deferred column loading.The “dynamic” relationship loader strategy described at Dynamic Relationship Loaders is not compatible with the asyncio approach and cannot be used, unless invoked within the
AsyncSession.run_sync()
method described at Running Synchronous Methods and Functions under asyncio.
Running Synchronous Methods and Functions under asyncio
Deep Alchemy
This approach is essentially exposing publicly the mechanism by which SQLAlchemy is able to provide the asyncio interface in the first place. While there is no technical issue with doing so, overall the approach can probably be considered “controversial” as it works against some of the central philosophies of the asyncio programming model, which is essentially that any programming statement that can potentially result in IO being invoked must have an await
call, lest the program does not make it explicitly clear every line at which IO may occur. This approach does not change that general idea, except that it allows a series of synchronous IO instructions to be exempted from this rule within the scope of a function call, essentially bundled up into a single awaitable.
As an alternative means of integrating traditional SQLAlchemy “lazy loading” within an asyncio event loop, an optional method known as AsyncSession.run_sync()
is provided which will run any Python function inside of a greenlet, where traditional synchronous programming concepts will be translated to use await
when they reach the database driver. A hypothetical approach here is an asyncio-oriented application can package up database-related methods into functions that are invoked using AsyncSession.run_sync()
.
Altering the above example, if we didn’t use selectinload()
for the A.bs
collection, we could accomplish our treatment of these attribute accesses within a separate function:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
def fetch_and_update_objects(session):
"""run traditional sync-style ORM code in a function that will be
invoked within an awaitable.
"""
# the session object here is a traditional ORM Session.
# all features are available here including legacy Query use.
stmt = select(A)
result = session.execute(stmt)
for a1 in result.scalars():
print(a1)
# lazy loads
for b1 in a1.bs:
print(b1)
# legacy Query use
a1 = session.query(A).order_by(A.id).first()
a1.data = "new data"
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
async with session.begin():
session.add_all(
[
A(bs=[B(), B()], data="a1"),
A(bs=[B()], data="a2"),
A(bs=[B(), B()], data="a3"),
]
)
await session.run_sync(fetch_and_update_objects)
await session.commit()
asyncio.run(async_main())
The above approach of running certain functions within a “sync” runner has some parallels to an application that runs a SQLAlchemy application on top of an event-based programming library such as gevent
. The differences are as follows:
unlike when using
gevent
, we can continue to use the standard Python asyncio event loop, or any custom event loop, without the need to integrate into thegevent
event loop.There is no “monkeypatching” whatsoever. The above example makes use of a real asyncio driver and the underlying SQLAlchemy connection pool is also using the Python built-in
asyncio.Queue
for pooling connections.The program can freely switch between async/await code and contained functions that use sync code with virtually no performance penalty. There is no “thread executor” or any additional waiters or synchronization in use.
The underlying network drivers are also using pure Python asyncio concepts, no third party networking libraries as
gevent
andeventlet
provides are in use.
Using multiple asyncio event loops
An application that makes use of multiple event loops, for example by combining asyncio with multithreading, should not share the same AsyncEngine
with different event loops when using the default pool implementation.
If an AsyncEngine
is be passed from one event loop to another, the method AsyncEngine.dispose()
should be called before it’s re-used on a new event loop. Failing to do so may lead to a RuntimeError
along the lines of Task <Task pending ...> got Future attached to a different loop
If the same engine must be shared between different loop, it should be configured to disable pooling using NullPool
, preventing the Engine from using any connection more than once:
from sqlalchemy.pool import NullPool
engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool
)
Engine API Documentation
Object Name | Description |
---|---|
An asyncio proxy for a | |
An asyncio proxy for a | |
An asyncio proxy for a | |
| Create a new async engine instance. |
function sqlalchemy.ext.asyncio.``create_async_engine
(\arg, **kw*)
Create a new async engine instance.
Arguments passed to create_async_engine()
are mostly identical to those passed to the create_engine()
function. The specified dialect must be an asyncio-compatible dialect such as asyncpg.
New in version 1.4.
class sqlalchemy.ext.asyncio.``AsyncEngine
(sync_engine: sqlalchemy.future.engine.Engine)
An asyncio proxy for a Engine
.
AsyncEngine
is acquired using the create_async_engine()
function:
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
New in version 1.4.
Class signature
class sqlalchemy.ext.asyncio.AsyncEngine
(sqlalchemy.ext.asyncio.base.ProxyComparable
, sqlalchemy.ext.asyncio.AsyncConnectable
)
method
sqlalchemy.ext.asyncio.AsyncEngine.
begin
()Return a context manager which when entered will deliver an
AsyncConnection
with anAsyncTransaction
established.E.g.:
async with async_engine.begin() as conn:
await conn.execute(
text("insert into table (x, y, z) values (1, 2, 3)")
)
await conn.execute(text("my_special_procedure(5)"))
method
sqlalchemy.ext.asyncio.AsyncEngine.
clear_compiled_cache
()Clear the compiled cache associated with the dialect.
Proxied for the
Engine
class on behalf of theAsyncEngine
class.This applies only to the built-in cache that is established via the
create_engine.query_cache_size
parameter. It will not impact any dictionary caches that were passed via theConnection.execution_options.query_cache
parameter.New in version 1.4.
method
sqlalchemy.ext.asyncio.AsyncEngine.
connect
() → sqlalchemy.ext.asyncio.AsyncConnectionReturn an
AsyncConnection
object.The
AsyncConnection
will procure a database connection from the underlying connection pool when it is entered as an async context manager:async with async_engine.connect() as conn:
result = await conn.execute(select(user_table))
The
AsyncConnection
may also be started outside of a context manager by invoking itsAsyncConnection.start()
method.async method
sqlalchemy.ext.asyncio.AsyncEngine.
dispose
()Dispose of the connection pool used by this
AsyncEngine
.This will close all connection pool connections that are currently checked in. See the documentation for the underlying
Engine.dispose()
method for further notes.See also
Engine.dispose()
method
sqlalchemy.ext.asyncio.AsyncEngine.
execution_options
(\*opt*)Return a new
AsyncEngine
that will provideAsyncConnection
objects with the given execution options.Proxied from
Engine.execution_options()
. See that method for details.method
sqlalchemy.ext.asyncio.AsyncEngine.
get_execution_options
()Get the non-SQL options which will take effect during execution.
Proxied for the
Engine
class on behalf of theAsyncEngine
class.See also
async method
sqlalchemy.ext.asyncio.AsyncEngine.
raw_connection
() → AnyReturn a “raw” DBAPI connection from the connection pool.
See also
method
sqlalchemy.ext.asyncio.AsyncEngine.
update_execution_options
(\*opt*)Update the default execution_options dictionary of this
Engine
.Proxied for the
Engine
class on behalf of theAsyncEngine
class.The given keys/values in **opt are added to the default execution options that will be used for all connections. The initial contents of this dictionary can be sent via the
execution_options
parameter tocreate_engine()
.See also
class sqlalchemy.ext.asyncio.``AsyncConnection
(async_engine: sqlalchemy.ext.asyncio.engine.AsyncEngine, sync_connection: Optional[sqlalchemy.future.engine.Connection] = None)
An asyncio proxy for a Connection
.
AsyncConnection
is acquired using the AsyncEngine.connect()
method of AsyncEngine
:
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
async with engine.connect() as conn:
result = await conn.execute(select(table))
New in version 1.4.
Class signature
class sqlalchemy.ext.asyncio.AsyncConnection
(sqlalchemy.ext.asyncio.base.ProxyComparable
, sqlalchemy.ext.asyncio.base.StartableContext
, sqlalchemy.ext.asyncio.AsyncConnectable
)
method
sqlalchemy.ext.asyncio.AsyncConnection.
begin
() → sqlalchemy.ext.asyncio.AsyncTransactionBegin a transaction prior to autobegin occurring.
method
sqlalchemy.ext.asyncio.AsyncConnection.
begin_nested
() → sqlalchemy.ext.asyncio.AsyncTransactionBegin a nested transaction and return a transaction handle.
async method
sqlalchemy.ext.asyncio.AsyncConnection.
close
()Close this
AsyncConnection
.This has the effect of also rolling back the transaction if one is in place.
async method
sqlalchemy.ext.asyncio.AsyncConnection.
commit
()Commit the transaction that is currently in progress.
This method commits the current transaction if one has been started. If no transaction was started, the method has no effect, assuming the connection is in a non-invalidated state.
A transaction is begun on a
Connection
automatically whenever a statement is first executed, or when theConnection.begin()
method is called.attribute
sqlalchemy.ext.asyncio.AsyncConnection.
connection
Not implemented for async; call
AsyncConnection.get_raw_connection()
.async method
sqlalchemy.ext.asyncio.AsyncConnection.
exec_driver_sql
(statement: sqlalchemy.sql.base.Executable, parameters: Optional[Mapping] = None, execution_options: Mapping = {}) → sqlalchemy.engine.ResultExecutes a driver-level SQL string and return buffered
Result
.async method
sqlalchemy.ext.asyncio.AsyncConnection.
execute
(statement: sqlalchemy.sql.base.Executable, parameters: Optional[Mapping] = None, execution_options: Mapping = {}) → sqlalchemy.engine.ResultExecutes a SQL statement construct and return a buffered
Result
.Parameters
object –
The statement to be executed. This is always an object that is in both the
ClauseElement
andExecutable
hierarchies, including:DDL
and objects which inherit fromDDLElement
parameters – parameters which will be bound into the statement. This may be either a dictionary of parameter names to values, or a mutable sequence (e.g. a list) of dictionaries. When a list of dictionaries is passed, the underlying statement execution will make use of the DBAPI
cursor.executemany()
method. When a single dictionary is passed, the DBAPIcursor.execute()
method will be used.execution_options – optional dictionary of execution options, which will be associated with the statement execution. This dictionary can provide a subset of the options that are accepted by
Connection.execution_options()
.
Returns
a
Result
object.
async method
sqlalchemy.ext.asyncio.AsyncConnection.
execution_options
(\*opt*)Set non-SQL options for the connection which take effect during execution.
This returns this
AsyncConnection
object with the new options added.See
Connection.execution_options()
for full details on this method.method
sqlalchemy.ext.asyncio.AsyncConnection.
get_nested_transaction
()Return an
AsyncTransaction
representing the current nested (savepoint) transaction, if any.This makes use of the underlying synchronous connection’s
Connection.get_nested_transaction()
method to get the currentTransaction
, which is then proxied in a newAsyncTransaction
object.New in version 1.4.0b2.
async method
sqlalchemy.ext.asyncio.AsyncConnection.
get_raw_connection
()Return the pooled DBAPI-level connection in use by this
AsyncConnection
.This is typically the SQLAlchemy connection-pool proxied connection which then has an attribute .connection that refers to the actual DBAPI-level connection.
method
sqlalchemy.ext.asyncio.AsyncConnection.
get_transaction
()Return an
AsyncTransaction
representing the current transaction, if any.This makes use of the underlying synchronous connection’s
Connection.get_transaction()
method to get the currentTransaction
, which is then proxied in a newAsyncTransaction
object.New in version 1.4.0b2.
method
sqlalchemy.ext.asyncio.AsyncConnection.
in_nested_transaction
()Return True if a transaction is in progress.
New in version 1.4.0b2.
method
sqlalchemy.ext.asyncio.AsyncConnection.
in_transaction
()Return True if a transaction is in progress.
New in version 1.4.0b2.
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
info
Return the
Connection.info
dictionary of the underlyingConnection
.This dictionary is freely writable for user-defined state to be associated with the database connection.
This attribute is only available if the
AsyncConnection
is currently connected. If theAsyncConnection.closed
attribute isTrue
, then accessing this attribute will raiseResourceClosedError
.New in version 1.4.0b2.
async method
sqlalchemy.ext.asyncio.AsyncConnection.
invalidate
(exception=None)Invalidate the underlying DBAPI connection associated with this
Connection
.See the method
Connection.invalidate()
for full detail on this method.async method
sqlalchemy.ext.asyncio.AsyncConnection.
rollback
()Roll back the transaction that is currently in progress.
This method rolls back the current transaction if one has been started. If no transaction was started, the method has no effect. If a transaction was started and the connection is in an invalidated state, the transaction is cleared using this method.
A transaction is begun on a
Connection
automatically whenever a statement is first executed, or when theConnection.begin()
method is called.async method
sqlalchemy.ext.asyncio.AsyncConnection.
run_sync
(fn: Callable, \arg, **kw*) → AnyInvoke the given sync callable passing self as the first argument.
This method maintains the asyncio event loop all the way through to the database connection by running the given callable in a specially instrumented greenlet.
E.g.:
with async_engine.begin() as conn:
await conn.run_sync(metadata.create_all)
Note
The provided callable is invoked inline within the asyncio event loop, and will block on traditional IO calls. IO within this callable should only call into SQLAlchemy’s asyncio database APIs which will be properly adapted to the greenlet context.
See also
async method
sqlalchemy.ext.asyncio.AsyncConnection.
scalar
(statement: sqlalchemy.sql.base.Executable, parameters: Optional[Mapping] = None, execution_options: Mapping = {}) → AnyExecutes a SQL statement construct and returns a scalar object.
This method is shorthand for invoking the
Result.scalar()
method after invoking theConnection.execute()
method. Parameters are equivalent.Returns
a scalar Python value representing the first column of the first row returned.
async method
sqlalchemy.ext.asyncio.AsyncConnection.
start
()Start this
AsyncConnection
object’s context outside of using a Pythonwith:
block.async method
sqlalchemy.ext.asyncio.AsyncConnection.
stream
(statement: sqlalchemy.sql.base.Executable, parameters: Optional[Mapping] = None, execution_options: Mapping = {}) → sqlalchemy.ext.asyncio.AsyncResultExecute a statement and return a streaming
AsyncResult
object.
class sqlalchemy.ext.asyncio.``AsyncTransaction
(connection: sqlalchemy.ext.asyncio.engine.AsyncConnection, nested: bool = False)
An asyncio proxy for a Transaction
.
Class signature
class sqlalchemy.ext.asyncio.AsyncTransaction
(sqlalchemy.ext.asyncio.base.ProxyComparable
, sqlalchemy.ext.asyncio.base.StartableContext
)
async method
sqlalchemy.ext.asyncio.AsyncTransaction.
close
()Close this
Transaction
.If this transaction is the base transaction in a begin/commit nesting, the transaction will rollback(). Otherwise, the method returns.
This is used to cancel a Transaction without affecting the scope of an enclosing transaction.
async method
sqlalchemy.ext.asyncio.AsyncTransaction.
commit
()Commit this
Transaction
.async method
sqlalchemy.ext.asyncio.AsyncTransaction.
rollback
()Roll back this
Transaction
.async method
sqlalchemy.ext.asyncio.AsyncTransaction.
start
()Start this
AsyncTransaction
object’s context outside of using a Pythonwith:
block.
Result Set API Documentation
The AsyncResult
object is an async-adapted version of the Result
object. It is only returned when using the AsyncConnection.stream()
or AsyncSession.stream()
methods, which return a result object that is on top of an active database cursor.
Object Name | Description |
---|---|
A wrapper for a | |
An asyncio wrapper around a | |
A wrapper for a |
class sqlalchemy.ext.asyncio.``AsyncResult
(real_result)
An asyncio wrapper around a Result
object.
The AsyncResult
only applies to statement executions that use a server-side cursor. It is returned only from the AsyncConnection.stream()
and AsyncSession.stream()
methods.
New in version 1.4.
Class signature
class sqlalchemy.ext.asyncio.AsyncResult
(sqlalchemy.ext.asyncio.AsyncCommon
)
async method
sqlalchemy.ext.asyncio.AsyncResult.
all
()Return all rows in a list.
Closes the result set after invocation. Subsequent invocations will return an empty list.
Returns
a list of
Row
objects.
method
sqlalchemy.ext.asyncio.AsyncResult.
columns
(\col_expressions*)Establish the columns that should be returned in each row.
Refer to
Result.columns()
in the synchronous SQLAlchemy API for a complete behavioral description.async method
sqlalchemy.ext.asyncio.AsyncResult.
fetchmany
(size=None)Fetch many rows.
When all rows are exhausted, returns an empty list.
This method is provided for backwards compatibility with SQLAlchemy 1.x.x.
To fetch rows in groups, use the
AsyncResult.partitions()
method.Returns
a list of
Row
objects.
See also
async method
sqlalchemy.ext.asyncio.AsyncResult.
fetchone
()Fetch one row.
When all rows are exhausted, returns None.
This method is provided for backwards compatibility with SQLAlchemy 1.x.x.
To fetch the first row of a result only, use the
Result.first()
method. To iterate through all rows, iterate theResult
object directly.Returns
a
Row
object if no filters are applied, or None if no rows remain.
async method
sqlalchemy.ext.asyncio.AsyncResult.
first
()Fetch the first row or None if no row is present.
Closes the result set and discards remaining rows.
Note
This method returns one row, e.g. tuple, by default. To return exactly one single scalar value, that is, the first column of the first row, use the
AsyncResult.scalar()
method, or combineAsyncResult.scalars()
andAsyncResult.first()
.Returns
a
Row
object, or None if no rows remain.
See also
async method
sqlalchemy.ext.asyncio.AsyncResult.
freeze
()Return a callable object that will produce copies of this
AsyncResult
when invoked.The callable object returned is an instance of
FrozenResult
.This is used for result set caching. The method must be called on the result when it has been unconsumed, and calling the method will consume the result fully. When the
FrozenResult
is retrieved from a cache, it can be called any number of times where it will produce a newResult
object each time against its stored set of rows.See also
Re-Executing Statements - example usage within the ORM to implement a result-set cache.
method
sqlalchemy.ext.asyncio.AsyncResult.
keys
()Return the
Result.keys()
collection from the underlyingResult
.method
sqlalchemy.ext.asyncio.AsyncResult.
mappings
()Apply a mappings filter to returned rows, returning an instance of
AsyncMappingResult
.When this filter is applied, fetching rows will return
RowMapping
objects instead ofRow
objects.Refer to
Result.mappings()
in the synchronous SQLAlchemy API for a complete behavioral description.Returns
a new
AsyncMappingResult
filtering object referring to the underlyingResult
object.
method
sqlalchemy.ext.asyncio.AsyncResult.
merge
(\others*)Merge this
AsyncResult
with other compatible result objects.The object returned is an instance of
MergedResult
, which will be composed of iterators from the given result objects.The new result will use the metadata from this result object. The subsequent result objects must be against an identical set of result / cursor metadata, otherwise the behavior is undefined.
async method
sqlalchemy.ext.asyncio.AsyncResult.
one
()Return exactly one row or raise an exception.
Raises
NoResultFound
if the result returns no rows, orMultipleResultsFound
if multiple rows would be returned.Note
This method returns one row, e.g. tuple, by default. To return exactly one single scalar value, that is, the first column of the first row, use the
AsyncResult.scalar_one()
method, or combineAsyncResult.scalars()
andAsyncResult.one()
.New in version 1.4.
Returns
The first
Row
.Raises
See also
async method
sqlalchemy.ext.asyncio.AsyncResult.
one_or_none
()Return at most one result or raise an exception.
Returns
None
if the result has no rows. RaisesMultipleResultsFound
if multiple rows are returned.New in version 1.4.
Returns
The first
Row
or None if no row is available.Raises
See also
method
sqlalchemy.ext.asyncio.AsyncResult.
partitions
(size=None)Iterate through sub-lists of rows of the size given.
An async iterator is returned:
async def scroll_results(connection):
result = await connection.stream(select(users_table))
async for partition in result.partitions(100):
print("list of rows: %s" % partition)
See also
async method
sqlalchemy.ext.asyncio.AsyncResult.
scalar
()Fetch the first column of the first row, and close the result set.
Returns None if there are no rows to fetch.
No validation is performed to test if additional rows remain.
After calling this method, the object is fully closed, e.g. the
CursorResult.close()
method will have been called.Returns
a Python scalar value , or None if no rows remain.
async method
sqlalchemy.ext.asyncio.AsyncResult.
scalar_one
()Return exactly one scalar result or raise an exception.
This is equivalent to calling
AsyncResult.scalars()
and thenAsyncResult.one()
.See also
async method
sqlalchemy.ext.asyncio.AsyncResult.
scalar_one_or_none
()Return exactly one or no scalar result.
This is equivalent to calling
AsyncResult.scalars()
and thenAsyncResult.one_or_none()
.See also
method
sqlalchemy.ext.asyncio.AsyncResult.
scalars
(index=0)Return an
AsyncScalarResult
filtering object which will return single elements rather thanRow
objects.Refer to
Result.scalars()
in the synchronous SQLAlchemy API for a complete behavioral description.Parameters
index – integer or row key indicating the column to be fetched from each row, defaults to
0
indicating the first column.Returns
a new
AsyncScalarResult
filtering object referring to thisAsyncResult
object.
method
sqlalchemy.ext.asyncio.AsyncResult.
unique
(strategy=None)Apply unique filtering to the objects returned by this
AsyncResult
.Refer to
Result.unique()
in the synchronous SQLAlchemy API for a complete behavioral description.
class sqlalchemy.ext.asyncio.``AsyncScalarResult
(real_result, index)
A wrapper for a AsyncResult
that returns scalar values rather than Row
values.
The AsyncScalarResult
object is acquired by calling the AsyncResult.scalars()
method.
Refer to the ScalarResult
object in the synchronous SQLAlchemy API for a complete behavioral description.
New in version 1.4.
Class signature
class sqlalchemy.ext.asyncio.AsyncScalarResult
(sqlalchemy.ext.asyncio.AsyncCommon
)
async method
sqlalchemy.ext.asyncio.AsyncScalarResult.
all
()Return all scalar values in a list.
Equivalent to
AsyncResult.all()
except that scalar values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncScalarResult.
fetchall
()A synonym for the
AsyncScalarResult.all()
method.async method
sqlalchemy.ext.asyncio.AsyncScalarResult.
fetchmany
(size=None)Fetch many objects.
Equivalent to
AsyncResult.fetchmany()
except that scalar values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncScalarResult.
first
()Fetch the first object or None if no object is present.
Equivalent to
AsyncResult.first()
except that scalar values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncScalarResult.
one
()Return exactly one object or raise an exception.
Equivalent to
AsyncResult.one()
except that scalar values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncScalarResult.
one_or_none
()Return at most one object or raise an exception.
Equivalent to
AsyncResult.one_or_none()
except that scalar values, rather thanRow
objects, are returned.method
sqlalchemy.ext.asyncio.AsyncScalarResult.
partitions
(size=None)Iterate through sub-lists of elements of the size given.
Equivalent to
AsyncResult.partitions()
except that scalar values, rather thanRow
objects, are returned.method
sqlalchemy.ext.asyncio.AsyncScalarResult.
unique
(strategy=None)Apply unique filtering to the objects returned by this
AsyncScalarResult
.See
AsyncResult.unique()
for usage details.
class sqlalchemy.ext.asyncio.``AsyncMappingResult
(result)
A wrapper for a AsyncResult
that returns dictionary values rather than Row
values.
The AsyncMappingResult
object is acquired by calling the AsyncResult.mappings()
method.
Refer to the MappingResult
object in the synchronous SQLAlchemy API for a complete behavioral description.
New in version 1.4.
Class signature
class sqlalchemy.ext.asyncio.AsyncMappingResult
(sqlalchemy.ext.asyncio.AsyncCommon
)
async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
all
()Return all scalar values in a list.
Equivalent to
AsyncResult.all()
except that mapping values, rather thanRow
objects, are returned.method
sqlalchemy.ext.asyncio.AsyncMappingResult.
columns
(\col_expressions*)Establish the columns that should be returned in each row.
async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
fetchall
()A synonym for the
AsyncMappingResult.all()
method.async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
fetchmany
(size=None)Fetch many objects.
Equivalent to
AsyncResult.fetchmany()
except that mapping values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
fetchone
()Fetch one object.
Equivalent to
AsyncResult.fetchone()
except that mapping values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
first
()Fetch the first object or None if no object is present.
Equivalent to
AsyncResult.first()
except that mapping values, rather thanRow
objects, are returned.method
sqlalchemy.ext.asyncio.AsyncMappingResult.
keys
()Return an iterable view which yields the string keys that would be represented by each
Row
.The view also can be tested for key containment using the Python
in
operator, which will test both for the string keys represented in the view, as well as for alternate keys such as column objects.Changed in version 1.4: a key view object is returned rather than a plain list.
async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
one
()Return exactly one object or raise an exception.
Equivalent to
AsyncResult.one()
except that mapping values, rather thanRow
objects, are returned.async method
sqlalchemy.ext.asyncio.AsyncMappingResult.
one_or_none
()Return at most one object or raise an exception.
Equivalent to
AsyncResult.one_or_none()
except that mapping values, rather thanRow
objects, are returned.method
sqlalchemy.ext.asyncio.AsyncMappingResult.
partitions
(size=None)Iterate through sub-lists of elements of the size given.
Equivalent to
AsyncResult.partitions()
except that mapping values, rather thanRow
objects, are returned.method
sqlalchemy.ext.asyncio.AsyncMappingResult.
unique
(strategy=None)Apply unique filtering to the objects returned by this
AsyncMappingResult
.See
AsyncResult.unique()
for usage details.
ORM Session API Documentation
Object Name | Description |
---|---|
Asyncio version of | |
A wrapper for the ORM |
class sqlalchemy.ext.asyncio.``AsyncSession
(bind: Optional[sqlalchemy.ext.asyncio.engine.AsyncEngine] = None, binds: Optional[Mapping[object, sqlalchemy.ext.asyncio.engine.AsyncEngine]] = None, \*kw*)
Asyncio version of Session
.
New in version 1.4.
method
sqlalchemy.ext.asyncio.AsyncSession.
add
(instance, _warn=True)Place an object in the
Session
.Proxied for the
Session
class on behalf of theAsyncSession
class.Its state will be persisted to the database on the next flush operation.
Repeated calls to
add()
will be ignored. The opposite ofadd()
isexpunge()
.method
sqlalchemy.ext.asyncio.AsyncSession.
add_all
(instances)Add the given collection of instances to this
Session
.Proxied for the
Session
class on behalf of theAsyncSession
class.method
sqlalchemy.ext.asyncio.AsyncSession.
begin
(\*kw*)Return an
AsyncSessionTransaction
object.The underlying
Session
will perform the “begin” action when theAsyncSessionTransaction
object is entered:async with async_session.begin():
# .. ORM transaction is begun
Note that database IO will not normally occur when the session-level transaction is begun, as database transactions begin on an on-demand basis. However, the begin block is async to accommodate for a
SessionEvents.after_transaction_create()
event hook that may perform IO.For a general description of ORM begin, see
Session.begin()
.method
sqlalchemy.ext.asyncio.AsyncSession.
begin_nested
(\*kw*)Return an
AsyncSessionTransaction
object which will begin a “nested” transaction, e.g. SAVEPOINT.Behavior is the same as that of
AsyncSession.begin()
.For a general description of ORM begin nested, see
Session.begin_nested()
.async method
sqlalchemy.ext.asyncio.AsyncSession.
close
()Close this
AsyncSession
.method
sqlalchemy.ext.asyncio.AsyncSession.
async classmethodclose_all
()Close all
AsyncSession
sessions.async method
sqlalchemy.ext.asyncio.AsyncSession.
commit
()Commit the current transaction in progress.
async method
sqlalchemy.ext.asyncio.AsyncSession.
connection
()Return a
AsyncConnection
object corresponding to thisSession
object’s transactional state.async method
sqlalchemy.ext.asyncio.AsyncSession.
delete
(instance)Mark an instance as deleted.
The database delete operation occurs upon
flush()
.As this operation may need to cascade along unloaded relationships, it is awaitable to allow for those queries to take place.
attribute
sqlalchemy.ext.asyncio.AsyncSession.
deleted
The set of all instances marked as ‘deleted’ within this
Session
Proxied for the
Session
class on behalf of theAsyncSession
class.attribute
sqlalchemy.ext.asyncio.AsyncSession.
dirty
The set of all persistent instances considered dirty.
Proxied for the
Session
class on behalf of theAsyncSession
class.E.g.:
some_mapped_object in session.dirty
Instances are considered dirty when they were modified but not deleted.
Note that this ‘dirty’ calculation is ‘optimistic’; most attribute-setting or collection modification operations will mark an instance as ‘dirty’ and place it in this set, even if there is no net change to the attribute’s value. At flush time, the value of each attribute is compared to its previously saved value, and if there’s no net change, no SQL operation will occur (this is a more expensive operation so it’s only done at flush time).
To check if an instance has actionable net changes to its attributes, use the
Session.is_modified()
method.async method
sqlalchemy.ext.asyncio.AsyncSession.
execute
(statement: sqlalchemy.sql.base.Executable, params: Optional[Mapping] = None, execution_options: Mapping = {}, bind_arguments: Optional[Mapping] = None, \*kw*) → sqlalchemy.engine.ResultExecute a statement and return a buffered
Result
object.method
sqlalchemy.ext.asyncio.AsyncSession.
expire
(instance, attribute_names=None)Expire the attributes on an instance.
Proxied for the
Session
class on behalf of theAsyncSession
class.Marks the attributes of an instance as out of date. When an expired attribute is next accessed, a query will be issued to the
Session
object’s current transactional context in order to load all expired attributes for the given instance. Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction.To expire all objects in the
Session
simultaneously, useSession.expire_all()
.The
Session
object’s default behavior is to expire all state whenever theSession.rollback()
orSession.commit()
methods are called, so that new state can be loaded for the new transaction. For this reason, callingSession.expire()
only makes sense for the specific case that a non-ORM SQL statement was emitted in the current transaction.Parameters
instance – The instance to be refreshed.
attribute_names – optional list of string attribute names indicating a subset of attributes to be expired.
See also
[Refreshing / Expiring]($ab781dd78600e308.md#session-expire) - introductory material
[`Session.expire()`]($2d92bd546622cf54.md#sqlalchemy.orm.Session.expire "sqlalchemy.orm.Session.expire")
[`Session.refresh()`]($2d92bd546622cf54.md#sqlalchemy.orm.Session.refresh "sqlalchemy.orm.Session.refresh")
[`Query.populate_existing()`]($3cf240505c8b4e45.md#sqlalchemy.orm.Query.populate_existing "sqlalchemy.orm.Query.populate_existing")
method
sqlalchemy.ext.asyncio.AsyncSession.
expire_all
()Expires all persistent instances within this Session.
Proxied for the
Session
class on behalf of theAsyncSession
class.When any attributes on a persistent instance is next accessed, a query will be issued using the
Session
object’s current transactional context in order to load all expired attributes for the given instance. Note that a highly isolated transaction will return the same values as were previously read in that same transaction, regardless of changes in database state outside of that transaction.To expire individual objects and individual attributes on those objects, use
Session.expire()
.The
Session
object’s default behavior is to expire all state whenever theSession.rollback()
orSession.commit()
methods are called, so that new state can be loaded for the new transaction. For this reason, callingSession.expire_all()
should not be needed when autocommit isFalse
, assuming the transaction is isolated.See also
Refreshing / Expiring - introductory material
method
sqlalchemy.ext.asyncio.AsyncSession.
expunge
(instance)Remove the instance from this
Session
.Proxied for the
Session
class on behalf of theAsyncSession
class.This will free all internal references to the instance. Cascading will be applied according to the expunge cascade rule.
method
sqlalchemy.ext.asyncio.AsyncSession.
expunge_all
()Remove all object instances from this
Session
.Proxied for the
Session
class on behalf of theAsyncSession
class.This is equivalent to calling
expunge(obj)
on all objects in thisSession
.async method
sqlalchemy.ext.asyncio.AsyncSession.
flush
(objects=None)Flush all the object changes to the database.
See also
async method
sqlalchemy.ext.asyncio.AsyncSession.
get
(entity, ident, options=None, populate_existing=False, with_for_update=None, identity_token=None)Return an instance based on the given primary key identifier, or
None
if not found.method
sqlalchemy.ext.asyncio.AsyncSession.
get_bind
(mapper=None, clause=None, bind=None, _sa_skip_events=None, _sa_skip_for_implicit_returning=False)Return a “bind” to which this
Session
is bound.Proxied for the
Session
class on behalf of theAsyncSession
class.The “bind” is usually an instance of
Engine
, except in the case where theSession
has been explicitly bound directly to aConnection
.For a multiply-bound or unbound
Session
, themapper
orclause
arguments are used to determine the appropriate bind to return.Note that the “mapper” argument is usually present when
Session.get_bind()
is called via an ORM operation such as aSession.query()
, each individual INSERT/UPDATE/DELETE operation within aSession.flush()
, call, etc.The order of resolution is:
if mapper given and
Session.binds
is present, locate a bind based first on the mapper in use, then on the mapped class in use, then on any base classes that are present in the__mro__
of the mapped class, from more specific superclasses to more general.if clause given and
Session.binds
is present, locate a bind based onTable
objects found in the given clause present inSession.binds
.if
Session.binds
is present, return that.if clause given, attempt to return a bind linked to the
MetaData
ultimately associated with the clause.if mapper given, attempt to return a bind linked to the
MetaData
ultimately associated with theTable
or other selectable to which the mapper is mapped.No bind can be found,
UnboundExecutionError
is raised.
Note that the
Session.get_bind()
method can be overridden on a user-defined subclass ofSession
to provide any kind of bind resolution scheme. See the example at Custom Vertical Partitioning.Parameters
mapper – Optional
mapper()
mapped class or instance ofMapper
. The bind can be derived from aMapper
first by consulting the “binds” map associated with thisSession
, and secondly by consulting theMetaData
associated with theTable
to which theMapper
is mapped for a bind.clause – A
ClauseElement
(i.e.select()
,text()
, etc.). If themapper
argument is not present or could not produce a bind, the given expression construct will be searched for a bound element, typically aTable
associated with boundMetaData
.
See also
[Partitioning Strategies (e.g. multiple database backends per Session)]($40343ee29299c061.md#session-partitioning)
[`Session.binds`]($2d92bd546622cf54.md#sqlalchemy.orm.Session.params.binds "sqlalchemy.orm.Session")
[`Session.bind_mapper()`]($2d92bd546622cf54.md#sqlalchemy.orm.Session.bind_mapper "sqlalchemy.orm.Session.bind_mapper")
[`Session.bind_table()`]($2d92bd546622cf54.md#sqlalchemy.orm.Session.bind_table "sqlalchemy.orm.Session.bind_table")
method
sqlalchemy.ext.asyncio.AsyncSession.
classmethodidentity_key
(\args, **kwargs*)Return an identity key.
Proxied for the
Session
class on behalf of theAsyncSession
class.This is an alias of
identity_key()
.method
sqlalchemy.ext.asyncio.AsyncSession.
in_transaction
()Return True if this
Session
has begun a transaction.Proxied for the
Session
class on behalf of theAsyncSession
class.New in version 1.4.
See also
attribute
sqlalchemy.ext.asyncio.AsyncSession.
info
A user-modifiable dictionary.
Proxied for the
Session
class on behalf of theAsyncSession
class.The initial value of this dictionary can be populated using the
info
argument to theSession
constructor orsessionmaker
constructor or factory methods. The dictionary here is always local to thisSession
and can be modified independently of all otherSession
objects.attribute
sqlalchemy.ext.asyncio.AsyncSession.
is_active
True if this
Session
not in “partial rollback” state.Proxied for the
Session
class on behalf of theAsyncSession
class.Changed in version 1.4: The
Session
no longer begins a new transaction immediately, so this attribute will be False when theSession
is first instantiated.“partial rollback” state typically indicates that the flush process of the
Session
has failed, and that theSession.rollback()
method must be emitted in order to fully roll back the transaction.If this
Session
is not in a transaction at all, theSession
will autobegin when it is first used, so in this caseSession.is_active
will return True.Otherwise, if this
Session
is within a transaction, and that transaction has not been rolled back internally, theSession.is_active
will also return True.See also
method
sqlalchemy.ext.asyncio.AsyncSession.
is_modified
(instance, include_collections=True)Return
True
if the given instance has locally modified attributes.Proxied for the
Session
class on behalf of theAsyncSession
class.This method retrieves the history for each instrumented attribute on the instance and performs a comparison of the current value to its previously committed value, if any.
It is in effect a more expensive and accurate version of checking for the given instance in the
Session.dirty
collection; a full test for each attribute’s net “dirty” status is performed.E.g.:
return session.is_modified(someobject)
A few caveats to this method apply:
Instances present in the
Session.dirty
collection may reportFalse
when tested with this method. This is because the object may have received change events via attribute mutation, thus placing it inSession.dirty
, but ultimately the state is the same as that loaded from the database, resulting in no net change here.Scalar attributes may not have recorded the previously set value when a new value was applied, if the attribute was not loaded, or was expired, at the time the new value was received - in these cases, the attribute is assumed to have a change, even if there is ultimately no net change against its database value. SQLAlchemy in most cases does not need the “old” value when a set event occurs, so it skips the expense of a SQL call if the old value isn’t present, based on the assumption that an UPDATE of the scalar value is usually needed, and in those few cases where it isn’t, is less expensive on average than issuing a defensive SELECT.
The “old” value is fetched unconditionally upon set only if the attribute container has the
active_history
flag set toTrue
. This flag is set typically for primary key attributes and scalar object references that are not a simple many-to-one. To set this flag for any arbitrary mapped column, use theactive_history
argument withcolumn_property()
.Parameters
instance – mapped instance to be tested for pending changes.
include_collections – Indicates if multivalued collections should be included in the operation. Setting this to
False
is a way to detect only local-column based properties (i.e. scalar columns or many-to-one foreign keys) that would result in an UPDATE for this instance upon flush.
async method
sqlalchemy.ext.asyncio.AsyncSession.
merge
(instance, load=True)Copy the state of a given instance into a corresponding instance within this
AsyncSession
.attribute
sqlalchemy.ext.asyncio.AsyncSession.
new
The set of all instances marked as ‘new’ within this
Session
.Proxied for the
Session
class on behalf of theAsyncSession
class.attribute
sqlalchemy.ext.asyncio.AsyncSession.
no_autoflush
Return a context manager that disables autoflush.
Proxied for the
Session
class on behalf of theAsyncSession
class.e.g.:
with session.no_autoflush:
some_object = SomeClass()
session.add(some_object)
# won't autoflush
some_object.related_thing = session.query(SomeRelated).first()
Operations that proceed within the
with:
block will not be subject to flushes occurring upon query access. This is useful when initializing a series of objects which involve existing database queries, where the uncompleted object should not yet be flushed.method
sqlalchemy.ext.asyncio.AsyncSession.
classmethodobject_session
(instance)Return the
Session
to which an object belongs.Proxied for the
Session
class on behalf of theAsyncSession
class.This is an alias of
object_session()
.async method
sqlalchemy.ext.asyncio.AsyncSession.
refresh
(instance, attribute_names=None, with_for_update=None)Expire and refresh the attributes on the given instance.
A query will be issued to the database and all attributes will be refreshed with their current database value.
This is the async version of the
Session.refresh()
method. See that method for a complete description of all options.async method
sqlalchemy.ext.asyncio.AsyncSession.
rollback
()Rollback the current transaction in progress.
async method
sqlalchemy.ext.asyncio.AsyncSession.
run_sync
(fn: Callable[[…], T], \arg, **kw*) → TInvoke the given sync callable passing sync self as the first argument.
This method maintains the asyncio event loop all the way through to the database connection by running the given callable in a specially instrumented greenlet.
E.g.:
with AsyncSession(async_engine) as session:
await session.run_sync(some_business_method)
Note
The provided callable is invoked inline within the asyncio event loop, and will block on traditional IO calls. IO within this callable should only call into SQLAlchemy’s asyncio database APIs which will be properly adapted to the greenlet context.
See also
async method
sqlalchemy.ext.asyncio.AsyncSession.
scalar
(statement: sqlalchemy.sql.base.Executable, params: Optional[Mapping] = None, execution_options: Mapping = {}, bind_arguments: Optional[Mapping] = None, \*kw*) → AnyExecute a statement and return a scalar result.
async method
sqlalchemy.ext.asyncio.AsyncSession.
stream
(statement, params=None, execution_options={}, bind_arguments=None, \*kw*)Execute a statement and return a streaming
AsyncResult
object.
class sqlalchemy.ext.asyncio.``AsyncSessionTransaction
(session, nested=False)
A wrapper for the ORM SessionTransaction
object.
This object is provided so that a transaction-holding object for the AsyncSession.begin()
may be returned.
The object supports both explicit calls to AsyncSessionTransaction.commit()
and AsyncSessionTransaction.rollback()
, as well as use as an async context manager.
New in version 1.4.
Class signature
class sqlalchemy.ext.asyncio.AsyncSessionTransaction
(sqlalchemy.ext.asyncio.base.StartableContext
)
async method
sqlalchemy.ext.asyncio.AsyncSessionTransaction.
commit
()Commit this
AsyncTransaction
.async method
sqlalchemy.ext.asyncio.AsyncSessionTransaction.
rollback
()Roll back this
AsyncTransaction
.