Database

The Peewee Database object represents a connection to a database. The Database class is instantiated with all the information needed to open a connection to a database, and then can be used to:

  • Open and close connections.
  • Execute queries.
  • Manage transactions (and savepoints).
  • Introspect tables, columns, indexes, and constraints.

Peewee comes with support for SQLite, MySQL and Postgres. Each database class provides some basic, database-specific configuration options.

  1. from peewee import *
  2. # SQLite database using WAL journal mode and 64MB cache.
  3. sqlite_db = SqliteDatabase('/path/to/app.db', pragmas={
  4. 'journal_mode': 'wal',
  5. 'cache_size': -1024 * 64})
  6. # Connect to a MySQL database on network.
  7. mysql_db = MySQLDatabase('my_app', user='app', password='db_password',
  8. host='10.1.0.8', port=3306)
  9. # Connect to a Postgres database.
  10. pg_db = PostgresqlDatabase('my_app', user='postgres', password='secret',
  11. host='10.1.0.9', port=5432)

Peewee provides advanced support for SQLite, Postgres and CockroachDB via database-specific extension modules. To use the extended-functionality, import the appropriate database-specific module and use the database class provided:

  1. from playhouse.sqlite_ext import SqliteExtDatabase
  2. # Use SQLite (will register a REGEXP function and set busy timeout to 3s).
  3. db = SqliteExtDatabase('/path/to/app.db', regexp_function=True, timeout=3,
  4. pragmas={'journal_mode': 'wal'})
  5. from playhouse.postgres_ext import PostgresqlExtDatabase
  6. # Use Postgres (and register hstore extension).
  7. db = PostgresqlExtDatabase('my_app', user='postgres', register_hstore=True)
  8. from playhouse.cockroachdb import CockroachDatabase
  9. # Use CockroachDB.
  10. db = CockroachDatabase('my_app', user='root', port=26257, host='10.1.0.8')
  11. # CockroachDB connections may require a number of parameters, which can
  12. # alternatively be specified using a connection-string.
  13. db = CockroachDatabase('postgresql://...')

For more information on database extensions, see:

Initializing a Database

The Database initialization method expects the name of the database as the first parameter. Subsequent keyword arguments are passed to the underlying database driver when establishing the connection, allowing you to pass vendor-specific parameters easily.

For instance, with Postgresql it is common to need to specify the host, user and password when creating your connection. These are not standard Peewee Database parameters, so they will be passed directly back to psycopg2 when creating connections:

  1. db = PostgresqlDatabase(
  2. 'database_name', # Required by Peewee.
  3. user='postgres', # Will be passed directly to psycopg2.
  4. password='secret', # Ditto.
  5. host='db.mysite.com') # Ditto.

As another example, the pymysql driver accepts a charset parameter which is not a standard Peewee Database parameter. To set this value, simply pass in charset alongside your other values:

  1. db = MySQLDatabase('database_name', user='www-data', charset='utf8mb4')

Consult your database driver’s documentation for the available parameters:

Using Postgresql

To connect to a Postgresql database, we will use PostgresqlDatabase. The first parameter is always the name of the database, and after that you can specify arbitrary psycopg2 parameters.

  1. psql_db = PostgresqlDatabase('my_database', user='postgres')
  2. class BaseModel(Model):
  3. """A base model that will use our Postgresql database"""
  4. class Meta:
  5. database = psql_db
  6. class User(BaseModel):
  7. username = CharField()

The Playhouse, extensions to Peewee contains a Postgresql extension module which provides many postgres-specific features such as:

If you would like to use these awesome features, use the PostgresqlExtDatabase from the playhouse.postgres_ext module:

  1. from playhouse.postgres_ext import PostgresqlExtDatabase
  2. psql_db = PostgresqlExtDatabase('my_database', user='postgres')

Isolation level

As of Peewee 3.9.7, the isolation level can be specified as an initialization parameter, using the symbolic constants in psycopg2.extensions:

  1. from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
  2. db = PostgresqlDatabase('my_app', user='postgres', host='db-host',
  3. isolation_level=ISOLATION_LEVEL_SERIALIZABLE)

Note

In older versions, you can manually set the isolation level on the underlying psycopg2 connection. This can be done in a one-off fashion:

  1. db = PostgresqlDatabase(...)
  2. conn = db.connection() # returns current connection.
  3. from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
  4. conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

To run this every time a connection is created, subclass and implement the _initialize_database() hook, which is designed for this purpose:

  1. class SerializedPostgresqlDatabase(PostgresqlDatabase):
  2. def _initialize_connection(self, conn):
  3. conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

Using CockroachDB

Connect to CockroachDB (CRDB) using the CockroachDatabase database class, defined in playhouse.cockroachdb:

  1. from playhouse.cockroachdb import CockroachDatabase
  2. db = CockroachDatabase('my_app', user='root', port=26257, host='localhost')

If you are using Cockroach Cloud, you may find it easier to specify the connection parameters using a connection-string:

  1. db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

Note

CockroachDB requires the psycopg2 (postgres) Python driver.

Note

CockroachDB installation and getting-started guide can be found here: https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html

CRDB provides client-side transaction retries, which are available using a special CockroachDatabase.run_transaction() helper-method. This method accepts a callable, which is responsible for executing any transactional statements that may need to be retried.

Simplest possible example of run_transaction():

  1. def create_user(email):
  2. # Callable that accepts a single argument (the database instance) and
  3. # which is responsible for executing the transactional SQL.
  4. def callback(db_ref):
  5. return User.create(email=email)
  6. return db.run_transaction(callback, max_attempts=10)
  7. huey = create_user('huey@example.com')

Note

The cockroachdb.ExceededMaxAttempts exception will be raised if the transaction cannot be committed after the given number of attempts. If the SQL is mal-formed, violates a constraint, etc., then the function will raise the exception to the caller.

For more information, see:

Using SQLite

To connect to a SQLite database, we will use SqliteDatabase. The first parameter is the filename containing the database, or the string ':memory:' to create an in-memory database. After the database filename, you can specify a list or pragmas or any other arbitrary sqlite3 parameters.

  1. sqlite_db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})
  2. class BaseModel(Model):
  3. """A base model that will use our Sqlite database."""
  4. class Meta:
  5. database = sqlite_db
  6. class User(BaseModel):
  7. username = TextField()
  8. # etc, etc

Peewee includes a SQLite extension module which provides many SQLite-specific features such as full-text search, json extension support, and much, much more. If you would like to use these awesome features, use the SqliteExtDatabase from the playhouse.sqlite_ext module:

  1. from playhouse.sqlite_ext import SqliteExtDatabase
  2. sqlite_db = SqliteExtDatabase('my_app.db', pragmas={
  3. 'journal_mode': 'wal', # WAL-mode.
  4. 'cache_size': -64 * 1000, # 64MB cache.
  5. 'synchronous': 0}) # Let the OS manage syncing.

PRAGMA statements

SQLite allows run-time configuration of a number of parameters through PRAGMA statements (SQLite documentation). These statements are typically run when a new database connection is created. To run one or more PRAGMA statements against new connections, you can specify them as a dictionary or a list of 2-tuples containing the pragma name and value:

  1. db = SqliteDatabase('my_app.db', pragmas={
  2. 'journal_mode': 'wal',
  3. 'cache_size': 10000, # 10000 pages, or ~40MB
  4. 'foreign_keys': 1, # Enforce foreign-key constraints
  5. })

PRAGMAs may also be configured dynamically using either the pragma() method or the special properties exposed on the SqliteDatabase object:

  1. # Set cache size to 64MB for *current connection*.
  2. db.pragma('cache_size', -1024 * 64)
  3. # Same as above.
  4. db.cache_size = -1024 * 64
  5. # Read the value of several pragmas:
  6. print('cache_size:', db.cache_size)
  7. print('foreign_keys:', db.foreign_keys)
  8. print('journal_mode:', db.journal_mode)
  9. print('page_size:', db.page_size)
  10. # Set foreign_keys pragma on current connection *AND* on all
  11. # connections opened subsequently.
  12. db.pragma('foreign_keys', 1, permanent=True)

Attention

Pragmas set using the pragma() method, by default, do not persist after the connection is closed. To configure a pragma to be run whenever a connection is opened, specify permanent=True.

Note

A full list of PRAGMA settings, their meaning and accepted values can be found in the SQLite documentation: http://sqlite.org/pragma.html

The following settings are what I use with SQLite for a typical web application database.

pragmarecommended settingexplanation
journal_modewalallow readers and writers to co-exist
cache_size-1 * data_size_kbset page-cache size in KiB, e.g. -32000 = 32MB
foreign_keys1enforce foreign-key constraints
ignore_check_constraints0enforce CHECK constraints
synchronous0let OS handle fsync (use with caution)

Example database using the above options:

  1. db = SqliteDatabase('my_app.db', pragmas={
  2. 'journal_mode': 'wal',
  3. 'cache_size': -1 * 64000, # 64MB
  4. 'foreign_keys': 1,
  5. 'ignore_check_constraints': 0,
  6. 'synchronous': 0})

User-defined functions

SQLite can be extended with user-defined Python code. The SqliteDatabase class supports three types of user-defined extensions:

  • Functions - which take any number of parameters and return a single value.
  • Aggregates - which aggregate parameters from multiple rows and return a single value.
  • Collations - which describe how to sort some value.

Note

For even more extension support, see SqliteExtDatabase, which is in the playhouse.sqlite_ext module.

Example user-defined function:

  1. db = SqliteDatabase('analytics.db')
  2. from urllib.parse import urlparse
  3. @db.func('hostname')
  4. def hostname(url):
  5. if url is not None:
  6. return urlparse(url).netloc
  7. # Call this function in our code:
  8. # The following finds the most common hostnames of referrers by count:
  9. query = (PageView
  10. .select(fn.hostname(PageView.referrer), fn.COUNT(PageView.id))
  11. .group_by(fn.hostname(PageView.referrer))
  12. .order_by(fn.COUNT(PageView.id).desc()))

Example user-defined aggregate:

  1. from hashlib import md5
  2. @db.aggregate('md5')
  3. class MD5Checksum(object):
  4. def __init__(self):
  5. self.checksum = md5()
  6. def step(self, value):
  7. self.checksum.update(value.encode('utf-8'))
  8. def finalize(self):
  9. return self.checksum.hexdigest()
  10. # Usage:
  11. # The following computes an aggregate MD5 checksum for files broken
  12. # up into chunks and stored in the database.
  13. query = (FileChunk
  14. .select(FileChunk.filename, fn.MD5(FileChunk.data))
  15. .group_by(FileChunk.filename)
  16. .order_by(FileChunk.filename, FileChunk.sequence))

Example collation:

  1. @db.collation('ireverse')
  2. def collate_reverse(s1, s2):
  3. # Case-insensitive reverse.
  4. s1, s2 = s1.lower(), s2.lower()
  5. return (s1 < s2) - (s1 > s2) # Equivalent to -cmp(s1, s2)
  6. # To use this collation to sort books in reverse order...
  7. Book.select().order_by(collate_reverse.collation(Book.title))
  8. # Or...
  9. Book.select().order_by(Book.title.asc(collation='reverse'))

Example user-defined table-value function (see TableFunction and table_function) for additional details:

  1. from playhouse.sqlite_ext import TableFunction
  2. db = SqliteDatabase('my_app.db')
  3. @db.table_function('series')
  4. class Series(TableFunction):
  5. columns = ['value']
  6. params = ['start', 'stop', 'step']
  7. def initialize(self, start=0, stop=None, step=1):
  8. """
  9. Table-functions declare an initialize() method, which is
  10. called with whatever arguments the user has called the
  11. function with.
  12. """
  13. self.start = self.current = start
  14. self.stop = stop or float('Inf')
  15. self.step = step
  16. def iterate(self, idx):
  17. """
  18. Iterate is called repeatedly by the SQLite database engine
  19. until the required number of rows has been read **or** the
  20. function raises a `StopIteration` signalling no more rows
  21. are available.
  22. """
  23. if self.current > self.stop:
  24. raise StopIteration
  25. ret, self.current = self.current, self.current + self.step
  26. return (ret,)
  27. # Usage:
  28. cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
  29. for value, in cursor:
  30. print(value)
  31. # Prints:
  32. # 0
  33. # 2
  34. # 4

For more information, see:

Set locking mode for transaction

SQLite transactions can be opened in three different modes:

  • Deferred (default) - only acquires lock when a read or write is performed. The first read creates a shared lock and the first write creates a reserved lock. Because the acquisition of the lock is deferred until actually needed, it is possible that another thread or process could create a separate transaction and write to the database after the BEGIN on the current thread has executed.
  • Immediate - a reserved lock is acquired immediately. In this mode, no other database may write to the database or open an immediate or exclusive transaction. Other processes can continue to read from the database, however.
  • Exclusive - opens an exclusive lock which prevents all (except for read uncommitted) connections from accessing the database until the transaction is complete.

Example specifying the locking mode:

  1. db = SqliteDatabase('app.db')
  2. with db.atomic('EXCLUSIVE'):
  3. do_something()
  4. @db.atomic('IMMEDIATE')
  5. def some_other_function():
  6. # This function is wrapped in an "IMMEDIATE" transaction.
  7. do_something_else()

For more information, see the SQLite locking documentation. To learn more about transactions in Peewee, see the Managing Transactions documentation.

APSW, an Advanced SQLite Driver

Peewee also comes with an alternate SQLite database that uses apsw, an advanced sqlite driver, an advanced Python SQLite driver. More information on APSW can be obtained on the APSW project website. APSW provides special features like:

  • Virtual tables, virtual file-systems, Blob I/O, backups and file control.
  • Connections can be shared across threads without any additional locking.
  • Transactions are managed explicitly by your code.
  • Unicode is handled correctly.
  • APSW is faster that the standard library sqlite3 module.
  • Exposes pretty much the entire SQLite C API to your Python app.

If you would like to use APSW, use the APSWDatabase from the apsw_ext module:

  1. from playhouse.apsw_ext import APSWDatabase
  2. apsw_db = APSWDatabase('my_app.db')

Using MySQL

To connect to a MySQL database, we will use MySQLDatabase. After the database name, you can specify arbitrary connection parameters that will be passed back to the driver (either MySQLdb or pymysql).

  1. mysql_db = MySQLDatabase('my_database')
  2. class BaseModel(Model):
  3. """A base model that will use our MySQL database"""
  4. class Meta:
  5. database = mysql_db
  6. class User(BaseModel):
  7. username = CharField()
  8. # etc, etc

Error 2006: MySQL server has gone away

This particular error can occur when MySQL kills an idle database connection. This typically happens with web apps that do not explicitly manage database connections. What happens is your application starts, a connection is opened to handle the first query that executes, and, since that connection is never closed, it remains open, waiting for more queries.

To fix this, make sure you are explicitly connecting to the database when you need to execute queries, and close your connection when you are done. In a web-application, this typically means you will open a connection when a request comes in, and close the connection when you return a response.

See the Framework Integration section for examples of configuring common web frameworks to manage database connections.

Connecting using a Database URL

The playhouse module Database URL provides a helper connect() function that accepts a database URL and returns a Database instance.

Example code:

  1. import os
  2. from peewee import *
  3. from playhouse.db_url import connect
  4. # Connect to the database URL defined in the environment, falling
  5. # back to a local Sqlite database if no database URL is specified.
  6. db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
  7. class BaseModel(Model):
  8. class Meta:
  9. database = db

Example database URLs:

  • sqlite:///my_database.db will create a SqliteDatabase instance for the file my_database.db in the current directory.
  • sqlite:///:memory: will create an in-memory SqliteDatabase instance.
  • postgresql://postgres:my_password@localhost:5432/my_database will create a PostgresqlDatabase instance. A username and password are provided, as well as the host and port to connect to.
  • mysql://user:passwd@ip:port/my_db will create a MySQLDatabase instance for the local MySQL database my_db.
  • More examples in the db_url documentation.

Run-time database configuration

Sometimes the database connection settings are not known until run-time, when these values may be loaded from a configuration file or the environment. In these cases, you can defer the initialization of the database by specifying None as the database_name.

  1. database = PostgresqlDatabase(None) # Un-initialized database.
  2. class SomeModel(Model):
  3. class Meta:
  4. database = database

If you try to connect or issue any queries while your database is uninitialized you will get an exception:

  1. >>> database.connect()
  2. Exception: Error, database not properly initialized before opening connection

To initialize your database, call the init() method with the database name and any additional keyword arguments:

  1. database_name = input('What is the name of the db? ')
  2. database.init(database_name, host='localhost', user='postgres')

For even more control over initializing your database, see the next section, Dynamically defining a database.

Dynamically defining a database

For even more control over how your database is defined/initialized, you can use the DatabaseProxy helper. DatabaseProxy objects act as a placeholder, and then at run-time you can swap it out for a different object. In the example below, we will swap out the database depending on how the app is configured:

  1. database_proxy = DatabaseProxy() # Create a proxy for our db.
  2. class BaseModel(Model):
  3. class Meta:
  4. database = database_proxy # Use proxy for our DB.
  5. class User(BaseModel):
  6. username = CharField()
  7. # Based on configuration, use a different database.
  8. if app.config['DEBUG']:
  9. database = SqliteDatabase('local.db')
  10. elif app.config['TESTING']:
  11. database = SqliteDatabase(':memory:')
  12. else:
  13. database = PostgresqlDatabase('mega_production_db')
  14. # Configure our proxy to use the db we specified in config.
  15. database_proxy.initialize(database)

Warning

Only use this method if your actual database driver varies at run-time. For instance, if your tests and local dev environment run on SQLite, but your deployed app uses PostgreSQL, you can use the DatabaseProxy to swap out engines at run-time.

However, if it is only connection values that vary at run-time, such as the path to the database file, or the database host, you should instead use Database.init(). See Run-time database configuration for more details.

Note

It may be easier to avoid the use of DatabaseProxy and instead use Database.bind() and related methods to set or change the database. See Setting the database at run-time for details.

Setting the database at run-time

We have seen three ways that databases can be configured with Peewee:

  1. # The usual way:
  2. db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})
  3. # Specify the details at run-time:
  4. db = SqliteDatabase(None)
  5. ...
  6. db.init(db_filename, pragmas={'journal_mode': 'wal'})
  7. # Or use a placeholder:
  8. db = DatabaseProxy()
  9. ...
  10. db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))

Peewee can also set or change the database for your model classes. This technique is used by the Peewee test suite to bind test model classes to various database instances when running the tests.

There are two sets of complementary methods:

As an example, we’ll declare two models without specifying any database:

  1. class User(Model):
  2. username = TextField()
  3. class Tweet(Model):
  4. user = ForeignKeyField(User, backref='tweets')
  5. content = TextField()
  6. timestamp = TimestampField()

Bind the models to a database at run-time:

  1. postgres_db = PostgresqlDatabase('my_app', user='postgres')
  2. sqlite_db = SqliteDatabase('my_app.db')
  3. # At this point, the User and Tweet models are NOT bound to any database.
  4. # Let's bind them to the Postgres database:
  5. postgres_db.bind([User, Tweet])
  6. # Now we will temporarily bind them to the sqlite database:
  7. with sqlite_db.bind_ctx([User, Tweet]):
  8. # User and Tweet are now bound to the sqlite database.
  9. assert User._meta.database is sqlite_db
  10. # User and Tweet are once again bound to the Postgres database.
  11. assert User._meta.database is postgres_db

The Model.bind() and Model.bind_ctx() methods work the same for binding a given model class:

  1. # Bind the user model to the sqlite db. By default, Peewee will also
  2. # bind any models that are related to User via foreign-key as well.
  3. User.bind(sqlite_db)
  4. assert User._meta.database is sqlite_db
  5. assert Tweet._meta.database is sqlite_db # Related models bound too.
  6. # Here we will temporarily bind *just* the User model to the postgres db.
  7. with User.bind_ctx(postgres_db, bind_backrefs=False):
  8. assert User._meta.database is postgres_db
  9. assert Tweet._meta.database is sqlite_db # Has not changed.
  10. # And now User is back to being bound to the sqlite_db.
  11. assert User._meta.database is sqlite_db

The Testing Peewee Applications section of this document also contains some examples of using the bind() methods.

Thread-Safety and Multiple Databases

If you plan to change the database at run-time in a multi-threaded application, storing the model’s database in a thread-local will prevent race-conditions. This can be accomplished with a custom model Metadata class (see ThreadSafeDatabaseMetadata, included in playhouse.shortcuts):

  1. from peewee import *
  2. from playhouse.shortcuts import ThreadSafeDatabaseMetadata
  3. class BaseModel(Model):
  4. class Meta:
  5. # Instruct peewee to use our thread-safe metadata implementation.
  6. model_metadata_class = ThreadSafeDatabaseMetadata

The database can now be swapped safely while running in a multi-threaded environment using the familiar Database.bind() or Database.bind_ctx() methods.

Connection Management

To open a connection to a database, use the Database.connect() method:

  1. >>> db = SqliteDatabase(':memory:') # In-memory SQLite database.
  2. >>> db.connect()
  3. True

If we try to call connect() on an already-open database, we get a OperationalError:

  1. >>> db.connect()
  2. Traceback (most recent call last):
  3. File "<stdin>", line 1, in <module>
  4. File "/home/charles/pypath/peewee.py", line 2390, in connect
  5. raise OperationalError('Connection already opened.')
  6. peewee.OperationalError: Connection already opened.

To prevent this exception from being raised, we can call connect() with an additional argument, reuse_if_open:

  1. >>> db.close() # Close connection.
  2. True
  3. >>> db.connect()
  4. True
  5. >>> db.connect(reuse_if_open=True)
  6. False

Note that the call to connect() returns False if the database connection was already open.

To close a connection, use the Database.close() method:

  1. >>> db.close()
  2. True

Calling close() on an already-closed connection will not result in an exception, but will return False:

  1. >>> db.connect() # Open connection.
  2. True
  3. >>> db.close() # Close connection.
  4. True
  5. >>> db.close() # Connection already closed, returns False.
  6. False

You can test whether the database is closed using the Database.is_closed() method:

  1. >>> db.is_closed()
  2. True

Using autoconnect

It is not necessary to explicitly connect to the database before using it if the database is initialized with autoconnect=True (the default). Managing connections explicitly is considered a best practice, therefore you may consider disabling the autoconnect behavior.

It is very helpful to be explicit about your connection lifetimes. If the connection fails, for instance, the exception will be caught when the connection is being opened, rather than some arbitrary time later when a query is executed. Furthermore, if using a connection pool, it is necessary to call connect() and close() to ensure connections are recycled properly.

For the best guarantee of correctness, disable autoconnect:

  1. db = PostgresqlDatabase('my_app', user='postgres', autoconnect=False)

Thread Safety

Peewee keeps track of the connection state using thread-local storage, making the Peewee Database object safe to use with multiple threads. Each thread will have it’s own connection, and as a result any given thread will only have a single connection open at a given time.

Context managers

The database object itself can be used as a context-manager, which opens a connection for the duration of the wrapped block of code. Additionally, a transaction is opened at the start of the wrapped block and committed before the connection is closed (unless an error occurs, in which case the transaction is rolled back).

  1. >>> db.is_closed()
  2. True
  3. >>> with db:
  4. ... print(db.is_closed()) # db is open inside context manager.
  5. ...
  6. False
  7. >>> db.is_closed() # db is closed.
  8. True

If you want to manage transactions separately, you can use the Database.connection_context() context manager.

  1. >>> with db.connection_context():
  2. ... # db connection is open.
  3. ... pass
  4. ...
  5. >>> db.is_closed() # db connection is closed.
  6. True

The connection_context() method can also be used as a decorator:

  1. @db.connection_context()
  2. def prepare_database():
  3. # DB connection will be managed by the decorator, which opens
  4. # a connection, calls function, and closes upon returning.
  5. db.create_tables(MODELS) # Create schema.
  6. load_fixture_data(db)

DB-API Connection Object

To obtain a reference to the underlying DB-API 2.0 connection, use the Database.connection() method. This method will return the currently-open connection object, if one exists, otherwise it will open a new connection.

  1. >>> db.connection()
  2. <sqlite3.Connection object at 0x7f94e9362f10>

Connection Pooling

Connection pooling is provided by the pool module, included in the playhouse extensions library. The pool supports:

  • Timeout after which connections will be recycled.
  • Upper bound on the number of open connections.
  1. from playhouse.pool import PooledPostgresqlExtDatabase
  2. db = PooledPostgresqlExtDatabase(
  3. 'my_database',
  4. max_connections=8,
  5. stale_timeout=300,
  6. user='postgres')
  7. class BaseModel(Model):
  8. class Meta:
  9. database = db

The following pooled database classes are available:

For an in-depth discussion of peewee’s connection pool, see the Connection pool section of the playhouse documentation.

Testing Peewee Applications

When writing tests for an application that uses Peewee, it may be desirable to use a special database for tests. Another common practice is to run tests against a clean database, which means ensuring tables are empty at the start of each test.

To bind your models to a database at run-time, you can use the following methods:

  • Database.bind_ctx(), which returns a context-manager that will bind the given models to the database instance for the duration of the wrapped block.
  • Model.bind_ctx(), which likewise returns a context-manager that binds the model (and optionally its dependencies) to the given database for the duration of the wrapped block.
  • Database.bind(), which is a one-time operation that binds the models (and optionally its dependencies) to the given database.
  • Model.bind(), which is a one-time operation that binds the model (and optionally its dependencies) to the given database.

Depending on your use-case, one of these options may make more sense. For the examples below, I will use Model.bind().

Example test-case setup:

  1. # tests.py
  2. import unittest
  3. from my_app.models import EventLog, Relationship, Tweet, User
  4. MODELS = [User, Tweet, EventLog, Relationship]
  5. # use an in-memory SQLite for tests.
  6. test_db = SqliteDatabase(':memory:')
  7. class BaseTestCase(unittest.TestCase):
  8. def setUp(self):
  9. # Bind model classes to test db. Since we have a complete list of
  10. # all models, we do not need to recursively bind dependencies.
  11. test_db.bind(MODELS, bind_refs=False, bind_backrefs=False)
  12. test_db.connect()
  13. test_db.create_tables(MODELS)
  14. def tearDown(self):
  15. # Not strictly necessary since SQLite in-memory databases only live
  16. # for the duration of the connection, and in the next step we close
  17. # the connection...but a good practice all the same.
  18. test_db.drop_tables(MODELS)
  19. # Close connection to db.
  20. test_db.close()
  21. # If we wanted, we could re-bind the models to their original
  22. # database here. But for tests this is probably not necessary.

As an aside, and speaking from experience, I recommend testing your application using the same database backend you use in production, so as to avoid any potential compatibility issues.

If you’d like to see some more examples of how to run tests using Peewee, check out Peewee’s own test-suite.

Async with Gevent

gevent is recommended for doing asynchronous I/O with Postgresql or MySQL. Reasons I prefer gevent:

  • No need for special-purpose “loop-aware” re-implementations of everything. Third-party libraries using asyncio usually have to re-implement layers and layers of code as well as re-implementing the protocols themselves.
  • Gevent allows you to write your application in normal, clean, idiomatic Python. No need to litter every line with “async”, “await” and other noise. No callbacks, futures, tasks, promises. No cruft.
  • Gevent works with both Python 2 and Python 3.
  • Gevent is Pythonic. Asyncio is an un-pythonic abomination.

Besides monkey-patching socket, no special steps are required if you are using MySQL with a pure Python driver like pymysql or are using mysql-connector in pure-python mode. MySQL drivers written in C will require special configuration which is beyond the scope of this document.

For Postgres and psycopg2, which is a C extension, you can use the following code snippet to register event hooks that will make your connection async:

  1. from gevent.socket import wait_read, wait_write
  2. from psycopg2 import extensions
  3. # Call this function after monkey-patching socket (etc).
  4. def patch_psycopg2():
  5. extensions.set_wait_callback(_psycopg2_gevent_callback)
  6. def _psycopg2_gevent_callback(conn, timeout=None):
  7. while True:
  8. state = conn.poll()
  9. if state == extensions.POLL_OK:
  10. break
  11. elif state == extensions.POLL_READ:
  12. wait_read(conn.fileno(), timeout=timeout)
  13. elif state == extensions.POLL_WRITE:
  14. wait_write(conn.fileno(), timeout=timeout)
  15. else:
  16. raise ValueError('poll() returned unexpected result')

SQLite, because it is embedded in the Python application itself, does not do any socket operations that would be a candidate for non-blocking. Async has no effect one way or the other on SQLite databases.

Framework Integration

For web applications, it is common to open a connection when a request is received, and to close the connection when the response is delivered. In this section I will describe how to add hooks to your web app to ensure the database connection is handled properly.

These steps will ensure that regardless of whether you’re using a simple SQLite database, or a pool of multiple Postgres connections, peewee will handle the connections correctly.

Note

Applications that receive lots of traffic may benefit from using a connection pool to mitigate the cost of setting up and tearing down connections on every request.

Flask

Flask and peewee are a great combo and my go-to for projects of any size. Flask provides two hooks which we will use to open and close our db connection. We’ll open the connection when a request is received, then close it when the response is returned.

  1. from flask import Flask
  2. from peewee import *
  3. database = SqliteDatabase('my_app.db')
  4. app = Flask(__name__)
  5. # This hook ensures that a connection is opened to handle any queries
  6. # generated by the request.
  7. @app.before_request
  8. def _db_connect():
  9. database.connect()
  10. # This hook ensures that the connection is closed when we've finished
  11. # processing the request.
  12. @app.teardown_request
  13. def _db_close(exc):
  14. if not database.is_closed():
  15. database.close()

Django

While it’s less common to see peewee used with Django, it is actually very easy to use the two. To manage your peewee database connections with Django, the easiest way in my opinion is to add a middleware to your app. The middleware should be the very first in the list of middlewares, to ensure it runs first when a request is handled, and last when the response is returned.

If you have a django project named my_blog and your peewee database is defined in the module my_blog.db, you might add the following middleware class:

  1. # middleware.py
  2. from my_blog.db import database # Import the peewee database instance.
  3. def PeeweeConnectionMiddleware(get_response):
  4. def middleware(request):
  5. database.connect()
  6. try:
  7. response = get_response(request)
  8. finally:
  9. if not database.is_closed():
  10. database.close()
  11. return response
  12. return middleware
  13. # Older Django < 1.10 middleware.
  14. class PeeweeConnectionMiddleware(object):
  15. def process_request(self, request):
  16. database.connect()
  17. def process_response(self, request, response):
  18. if not database.is_closed():
  19. database.close()
  20. return response

To ensure this middleware gets executed, add it to your settings module:

  1. # settings.py
  2. MIDDLEWARE_CLASSES = (
  3. # Our custom middleware appears first in the list.
  4. 'my_blog.middleware.PeeweeConnectionMiddleware',
  5. # These are the default Django 1.7 middlewares. Yours may differ,
  6. # but the important this is that our Peewee middleware comes first.
  7. 'django.middleware.common.CommonMiddleware',
  8. 'django.contrib.sessions.middleware.SessionMiddleware',
  9. 'django.middleware.csrf.CsrfViewMiddleware',
  10. 'django.contrib.auth.middleware.AuthenticationMiddleware',
  11. 'django.contrib.messages.middleware.MessageMiddleware',
  12. )
  13. # ... other Django settings ...

Bottle

I haven’t used bottle myself, but looking at the documentation I believe the following code should ensure the database connections are properly managed:

  1. # app.py
  2. from bottle import hook #, route, etc, etc.
  3. from peewee import *
  4. db = SqliteDatabase('my-bottle-app.db')
  5. @hook('before_request')
  6. def _connect_db():
  7. db.connect()
  8. @hook('after_request')
  9. def _close_db():
  10. if not db.is_closed():
  11. db.close()
  12. # Rest of your bottle app goes here.

Web.py

See the documentation for application processors.

  1. db = SqliteDatabase('my_webpy_app.db')
  2. def connection_processor(handler):
  3. db.connect()
  4. try:
  5. return handler()
  6. finally:
  7. if not db.is_closed():
  8. db.close()
  9. app.add_processor(connection_processor)

Tornado

It looks like Tornado’s RequestHandler class implements two hooks which can be used to open and close connections when a request is handled.

  1. from tornado.web import RequestHandler
  2. db = SqliteDatabase('my_db.db')
  3. class PeeweeRequestHandler(RequestHandler):
  4. def prepare(self):
  5. db.connect()
  6. return super(PeeweeRequestHandler, self).prepare()
  7. def on_finish(self):
  8. if not db.is_closed():
  9. db.close()
  10. return super(PeeweeRequestHandler, self).on_finish()

In your app, instead of extending the default RequestHandler, now you can extend PeeweeRequestHandler.

Note that this does not address how to use peewee asynchronously with Tornado or another event loop.

Wheezy.web

The connection handling code can be placed in a middleware.

  1. def peewee_middleware(request, following):
  2. db.connect()
  3. try:
  4. response = following(request)
  5. finally:
  6. if not db.is_closed():
  7. db.close()
  8. return response
  9. app = WSGIApplication(middleware=[
  10. lambda x: peewee_middleware,
  11. # ... other middlewares ...
  12. ])

Thanks to GitHub user @tuukkamustonen for submitting this code.

Falcon

The connection handling code can be placed in a middleware component.

  1. import falcon
  2. from peewee import *
  3. database = SqliteDatabase('my_app.db')
  4. class PeeweeConnectionMiddleware(object):
  5. def process_request(self, req, resp):
  6. database.connect()
  7. def process_response(self, req, resp, resource, req_succeeded):
  8. if not database.is_closed():
  9. database.close()
  10. application = falcon.API(middleware=[
  11. PeeweeConnectionMiddleware(),
  12. # ... other middlewares ...
  13. ])

Pyramid

Set up a Request factory that handles database connection lifetime as follows:

  1. from pyramid.request import Request
  2. db = SqliteDatabase('pyramidapp.db')
  3. class MyRequest(Request):
  4. def __init__(self, *args, **kwargs):
  5. super().__init__(*args, **kwargs)
  6. db.connect()
  7. self.add_finished_callback(self.finish)
  8. def finish(self, request):
  9. if not db.is_closed():
  10. db.close()

In your application main() make sure MyRequest is used as request_factory:

  1. def main(global_settings, **settings):
  2. config = Configurator(settings=settings, ...)
  3. config.set_request_factory(MyRequest)

CherryPy

See Publish/Subscribe pattern.

  1. def _db_connect():
  2. db.connect()
  3. def _db_close():
  4. if not db.is_closed():
  5. db.close()
  6. cherrypy.engine.subscribe('before_request', _db_connect)
  7. cherrypy.engine.subscribe('after_request', _db_close)

Sanic

In Sanic, the connection handling code can be placed in the request and response middleware sanic middleware.

  1. # app.py
  2. @app.middleware('request')
  3. async def handle_request(request):
  4. db.connect()
  5. @app.middleware('response')
  6. async def handle_response(request, response):
  7. if not db.is_closed():
  8. db.close()

FastAPI

Similar to Flask, FastAPI provides two event based hooks which we will use to open and close our db connection. We’ll open the connection when a request is received, then close it when the response is returned.

  1. from fastapi import FastAPI
  2. from peewee import *
  3. db = SqliteDatabase('my_app.db')
  4. app = FastAPI()
  5. # This hook ensures that a connection is opened to handle any queries
  6. # generated by the request.
  7. @app.on_event("startup")
  8. def startup():
  9. db.connect()
  10. # This hook ensures that the connection is closed when we've finished
  11. # processing the request.
  12. @app.on_event("shutdown")
  13. def shutdown():
  14. if not db.is_closed():
  15. db.close()

Other frameworks

Don’t see your framework here? Please open a GitHub ticket and I’ll see about adding a section, or better yet, submit a documentation pull-request.

Executing Queries

SQL queries will typically be executed by calling execute() on a query constructed using the query-builder APIs (or by simply iterating over a query object in the case of a Select query). For cases where you wish to execute SQL directly, you can use the Database.execute_sql() method.

  1. db = SqliteDatabase('my_app.db')
  2. db.connect()
  3. # Example of executing a simple query and ignoring the results.
  4. db.execute_sql("ATTACH DATABASE ':memory:' AS cache;")
  5. # Example of iterating over the results of a query using the cursor.
  6. cursor = db.execute_sql('SELECT * FROM users WHERE status = ?', (ACTIVE,))
  7. for row in cursor.fetchall():
  8. # Do something with row, which is a tuple containing column data.
  9. pass

Managing Transactions

Peewee provides several interfaces for working with transactions. The most general is the Database.atomic() method, which also supports nested transactions. atomic() blocks will be run in a transaction or savepoint, depending on the level of nesting.

If an exception occurs in a wrapped block, the current transaction/savepoint will be rolled back. Otherwise the statements will be committed at the end of the wrapped block.

Note

While inside a block wrapped by the atomic() context manager, you can explicitly rollback or commit at any point by calling Transaction.rollback() or Transaction.commit(). When you do this inside a wrapped block of code, a new transaction will be started automatically.

  1. with db.atomic() as transaction: # Opens new transaction.
  2. try:
  3. save_some_objects()
  4. except ErrorSavingData:
  5. # Because this block of code is wrapped with "atomic", a
  6. # new transaction will begin automatically after the call
  7. # to rollback().
  8. transaction.rollback()
  9. error_saving = True
  10. create_report(error_saving=error_saving)
  11. # Note: no need to call commit. Since this marks the end of the
  12. # wrapped block of code, the `atomic` context manager will
  13. # automatically call commit for us.

Note

atomic() can be used as either a context manager or a decorator.

Context manager

Using atomic as context manager:

  1. db = SqliteDatabase(':memory:')
  2. with db.atomic() as txn:
  3. # This is the outer-most level, so this block corresponds to
  4. # a transaction.
  5. User.create(username='charlie')
  6. with db.atomic() as nested_txn:
  7. # This block corresponds to a savepoint.
  8. User.create(username='huey')
  9. # This will roll back the above create() query.
  10. nested_txn.rollback()
  11. User.create(username='mickey')
  12. # When the block ends, the transaction is committed (assuming no error
  13. # occurs). At that point there will be two users, "charlie" and "mickey".

You can use the atomic method to perform get or create operations as well:

  1. try:
  2. with db.atomic():
  3. user = User.create(username=username)
  4. return 'Success'
  5. except peewee.IntegrityError:
  6. return 'Failure: %s is already in use.' % username

Decorator

Using atomic as a decorator:

  1. @db.atomic()
  2. def create_user(username):
  3. # This statement will run in a transaction. If the caller is already
  4. # running in an `atomic` block, then a savepoint will be used instead.
  5. return User.create(username=username)
  6. create_user('charlie')

Nesting Transactions

atomic() provides transparent nesting of transactions. When using atomic(), the outer-most call will be wrapped in a transaction, and any nested calls will use savepoints.

  1. with db.atomic() as txn:
  2. perform_operation()
  3. with db.atomic() as nested_txn:
  4. perform_another_operation()

Peewee supports nested transactions through the use of savepoints (for more information, see savepoint()).

Explicit transaction

If you wish to explicitly run code in a transaction, you can use transaction(). Like atomic(), transaction() can be used as a context manager or as a decorator.

If an exception occurs in a wrapped block, the transaction will be rolled back. Otherwise the statements will be committed at the end of the wrapped block.

  1. db = SqliteDatabase(':memory:')
  2. with db.transaction() as txn:
  3. # Delete the user and their associated tweets.
  4. user.delete_instance(recursive=True)

Transactions can be explicitly committed or rolled-back within the wrapped block. When this happens, a new transaction will be started.

  1. with db.transaction() as txn:
  2. User.create(username='mickey')
  3. txn.commit() # Changes are saved and a new transaction begins.
  4. User.create(username='huey')
  5. # Roll back. "huey" will not be saved, but since "mickey" was already
  6. # committed, that row will remain in the database.
  7. txn.rollback()
  8. with db.transaction() as txn:
  9. User.create(username='whiskers')
  10. # Roll back changes, which removes "whiskers".
  11. txn.rollback()
  12. # Create a new row for "mr. whiskers" which will be implicitly committed
  13. # at the end of the `with` block.
  14. User.create(username='mr. whiskers')

Note

If you attempt to nest transactions with peewee using the transaction() context manager, only the outer-most transaction will be used. However if an exception occurs in a nested block, this can lead to unpredictable behavior, so it is strongly recommended that you use atomic().

Explicit Savepoints

Just as you can explicitly create transactions, you can also explicitly create savepoints using the savepoint() method. Savepoints must occur within a transaction, but can be nested arbitrarily deep.

  1. with db.transaction() as txn:
  2. with db.savepoint() as sp:
  3. User.create(username='mickey')
  4. with db.savepoint() as sp2:
  5. User.create(username='zaizee')
  6. sp2.rollback() # "zaizee" will not be saved, but "mickey" will be.

Warning

If you manually commit or roll back a savepoint, a new savepoint will not automatically be created. This differs from the behavior of transaction, which will automatically open a new transaction after manual commit/rollback.

Autocommit Mode

By default, Peewee operates in autocommit mode, such that any statements executed outside of a transaction are run in their own transaction. To group multiple statements into a transaction, Peewee provides the atomic() context-manager/decorator. This should cover all use-cases, but in the unlikely event you want to temporarily disable Peewee’s transaction management completely, you can use the Database.manual_commit() context-manager/decorator.

Here is how you might emulate the behavior of the transaction() context manager:

  1. with db.manual_commit():
  2. db.begin() # Have to begin transaction explicitly.
  3. try:
  4. user.delete_instance(recursive=True)
  5. except:
  6. db.rollback() # Rollback! An error occurred.
  7. raise
  8. else:
  9. try:
  10. db.commit() # Commit changes.
  11. except:
  12. db.rollback()
  13. raise

Again – I don’t anticipate anyone needing this, but it’s here just in case.

Database Errors

The Python DB-API 2.0 spec describes several types of exceptions. Because most database drivers have their own implementations of these exceptions, Peewee simplifies things by providing its own wrappers around any implementation-specific exception classes. That way, you don’t need to worry about importing any special exception classes, you can just use the ones from peewee:

  • DatabaseError
  • DataError
  • IntegrityError
  • InterfaceError
  • InternalError
  • NotSupportedError
  • OperationalError
  • ProgrammingError

Note

All of these error classes extend PeeweeException.

Logging queries

All queries are logged to the peewee namespace using the standard library logging module. Queries are logged using the DEBUG level. If you’re interested in doing something with the queries, you can simply register a handler.

  1. # Print all queries to stderr.
  2. import logging
  3. logger = logging.getLogger('peewee')
  4. logger.addHandler(logging.StreamHandler())
  5. logger.setLevel(logging.DEBUG)

Adding a new Database Driver

Peewee comes with built-in support for Postgres, MySQL and SQLite. These databases are very popular and run the gamut from fast, embeddable databases to heavyweight servers suitable for large-scale deployments. That being said, there are a ton of cool databases out there and adding support for your database-of-choice should be really easy, provided the driver supports the DB-API 2.0 spec.

The DB-API 2.0 spec should be familiar to you if you’ve used the standard library sqlite3 driver, psycopg2 or the like. Peewee currently relies on a handful of parts:

  • Connection.commit
  • Connection.execute
  • Connection.rollback
  • Cursor.description
  • Cursor.fetchone

These methods are generally wrapped up in higher-level abstractions and exposed by the Database, so even if your driver doesn’t do these exactly you can still get a lot of mileage out of peewee. An example is the apsw sqlite driver in the “playhouse” module.

The first thing is to provide a subclass of Database that will open a connection.

  1. from peewee import Database
  2. import foodb # Our fictional DB-API 2.0 driver.
  3. class FooDatabase(Database):
  4. def _connect(self, database, **kwargs):
  5. return foodb.connect(database, **kwargs)

The Database provides a higher-level API and is responsible for executing queries, creating tables and indexes, and introspecting the database to get lists of tables. The above implementation is the absolute minimum needed, though some features will not work – for best results you will want to additionally add a method for extracting a list of tables and indexes for a table from the database. We’ll pretend that FooDB is a lot like MySQL and has special “SHOW” statements:

  1. class FooDatabase(Database):
  2. def _connect(self):
  3. return foodb.connect(self.database, **self.connect_params)
  4. def get_tables(self):
  5. res = self.execute('SHOW TABLES;')
  6. return [r[0] for r in res.fetchall()]

Other things the database handles that are not covered here include:

  • last_insert_id() and rows_affected()
  • param and quote, which tell the SQL-generating code how to add parameter placeholders and quote entity names.
  • field_types for mapping data-types like INT or TEXT to their vendor-specific type names.
  • operations for mapping operations such as “LIKE/ILIKE” to their database equivalent

Refer to the Database API reference or the source code. for details.

Note

If your driver conforms to the DB-API 2.0 spec, there shouldn’t be much work needed to get up and running.

Our new database can be used just like any of the other database subclasses:

  1. from peewee import *
  2. from foodb_ext import FooDatabase
  3. db = FooDatabase('my_database', user='foo', password='secret')
  4. class BaseModel(Model):
  5. class Meta:
  6. database = db
  7. class Blog(BaseModel):
  8. title = CharField()
  9. contents = TextField()
  10. pub_date = DateTimeField()