- Playhouse, extensions to Peewee
- Sqlite Extensions
- SqliteQ
- Sqlite User-Defined Functions
- apsw, an advanced sqlite driver
- Sqlcipher backend
- Postgresql Extensions
- Cockroach Database
- MySQL Extensions
- DataSet
- Fields
- Hybrid Attributes
- Key/Value Store
- Shortcuts
- Signal support
- pwiz, a model generator
- Schema Migrations
- Reflection
- Database URL
- Connection pool
- Test Utils
- Flask Utils
Playhouse, extensions to Peewee
Peewee comes with numerous extension modules which are collected under the playhouse
namespace. Despite the silly name, there are some very useful extensions, particularly those that expose vendor-specific database features like the SQLite Extensions and Postgresql Extensions extensions.
Below you will find a loosely organized listing of the various modules that make up the playhouse
.
Database drivers / vendor-specific database functionality
- SQLite Extensions (on its own page)
- SqliteQ
- Sqlite User-Defined Functions
- apsw, an advanced sqlite driver
- Sqlcipher backend
- Postgresql Extensions
- Cockroach Database
- MySQL Extensions
High-level features
Database management and framework integration
Sqlite Extensions
The Sqlite extensions have been moved to their own page.
SqliteQ
The playhouse.sqliteq
module provides a subclass of SqliteExtDatabase, that will serialize concurrent writes to a SQLite database. SqliteQueueDatabase
can be used as a drop-in replacement for the regular SqliteDatabase if you want simple read and write access to a SQLite database from multiple threads.
SQLite only allows one connection to write to the database at any given time. As a result, if you have a multi-threaded application (like a web-server, for example) that needs to write to the database, you may see occasional errors when one or more of the threads attempting to write cannot acquire the lock.
SqliteQueueDatabase
is designed to simplify things by sending all write queries through a single, long-lived connection. The benefit is that you get the appearance of multiple threads writing to the database without conflicts or timeouts. The downside, however, is that you cannot issue write transactions that encompass multiple queries – all writes run in autocommit mode, essentially.
Note
The module gets its name from the fact that all write queries get put into a thread-safe queue. A single worker thread listens to the queue and executes all queries that are sent to it.
Transactions
Because all queries are serialized and executed by a single worker thread, it is possible for transactional SQL from separate threads to be executed out-of-order. In the example below, the transaction started by thread “B” is rolled back by thread “A” (with bad consequences!):
- Thread A: UPDATE transplants SET organ=’liver’, …;
- Thread B: BEGIN TRANSACTION;
- Thread B: UPDATE life_support_system SET timer += 60 …;
- Thread A: ROLLBACK; – Oh no….
Since there is a potential for queries from separate transactions to be interleaved, the transaction()
and atomic()
methods are disabled on SqliteQueueDatabase
.
For cases when you wish to temporarily write to the database from a different thread, you can use the pause()
and unpause()
methods. These methods block the caller until the writer thread is finished with its current workload. The writer then disconnects and the caller takes over until unpause
is called.
The stop()
, start()
, and is_stopped()
methods can also be used to control the writer thread.
Note
Take a look at SQLite’s isolation documentation for more information about how SQLite handles concurrent connections.
Code sample
Creating a database instance does not require any special handling. The SqliteQueueDatabase
accepts some special parameters which you should be aware of, though. If you are using gevent, you must specify use_gevent=True
when instantiating your database – this way Peewee will know to use the appropriate objects for handling queueing, thread creation, and locking.
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase(
'my_app.db',
use_gevent=False, # Use the standard library "threading" module.
autostart=False, # The worker thread now must be started manually.
queue_max_size=64, # Max. # of pending writes that can accumulate.
results_timeout=5.0) # Max. time to wait for query to be executed.
If autostart=False
, as in the above example, you will need to call start()
to bring up the worker threads that will do the actual write query execution.
@app.before_first_request
def _start_worker_threads():
db.start()
If you plan on performing SELECT queries or generally wanting to access the database, you will need to call connect() and close() as you would with any other database instance.
When your application is ready to terminate, use the stop()
method to shut down the worker thread. If there was a backlog of work, then this method will block until all pending work is finished (though no new work is allowed).
import atexit
@atexit.register
def _stop_worker_threads():
db.stop()
Lastly, the is_stopped()
method can be used to determine whether the database writer is up and running.
Sqlite User-Defined Functions
The sqlite_udf
playhouse module contains a number of user-defined functions, aggregates, and table-valued functions, which you may find useful. The functions are grouped in collections and you can register these user-defined extensions individually, by collection, or register everything.
Scalar functions are functions which take a number of parameters and return a single value. For example, converting a string to upper-case, or calculating the MD5 hex digest.
Aggregate functions are like scalar functions that operate on multiple rows of data, producing a single result. For example, calculating the sum of a list of integers, or finding the smallest value in a particular column.
Table-valued functions are simply functions that can return multiple rows of data. For example, a regular-expression search function that returns all the matches in a given string, or a function that accepts two dates and generates all the intervening days.
Note
To use table-valued functions, you will need to build the playhouse._sqlite_ext
C extension.
Registering user-defined functions:
db = SqliteDatabase('my_app.db')
# Register *all* functions.
register_all(db)
# Alternatively, you can register individual groups. This will just
# register the DATE and MATH groups of functions.
register_groups(db, 'DATE', 'MATH')
# If you only wish to register, say, the aggregate functions for a
# particular group or groups, you can:
register_aggregate_groups(db, 'DATE')
# If you only wish to register a single function, then you can:
from playhouse.sqlite_udf import gzip, gunzip
db.register_function(gzip, 'gzip')
db.register_function(gunzip, 'gunzip')
Using a library function (“hostname”):
# Assume we have a model, Link, that contains lots of arbitrary URLs.
# We want to discover the most common hosts that have been linked.
query = (Link
.select(fn.hostname(Link.url).alias('host'), fn.COUNT(Link.id))
.group_by(fn.hostname(Link.url))
.order_by(fn.COUNT(Link.id).desc())
.tuples())
# Print the hostname along with number of links associated with it.
for host, count in query:
print('%s: %s' % (host, count))
Functions, listed by collection name
Scalar functions are indicated by (f)
, aggregate functions by (a)
, and table-valued functions by (t)
.
CONTROL_FLOW
if_then_else
(cond, truthy[, falsey=None])
Simple ternary-type operator, where, depending on the truthiness of the cond
parameter, either the truthy
or falsey
value will be returned.
DATE
strip_tz
(date_str)
Parameters: | date_str – A datetime, encoded as a string. |
---|---|
Returns: | The datetime with any timezone info stripped off. |
The time is not adjusted in any way, the timezone is simply removed.
humandelta
(nseconds[, glue=’, ‘])
Parameters: |
|
---|---|
Returns: | Easy-to-read description of timedelta. |
Example, 86471 -> “1 day, 1 minute, 11 seconds”
mintdiff
(datetime_value)
Parameters: | datetime_value – A date-time. |
---|---|
Returns: | Minimum difference between any two values in list. |
Aggregate function that computes the minimum difference between any two datetimes.
avgtdiff
(datetime_value)
Parameters: | datetime_value – A date-time. |
---|---|
Returns: | Average difference between values in list. |
Aggregate function that computes the average difference between consecutive values in the list.
duration
(datetime_value)
Parameters: | datetime_value – A date-time. |
---|---|
Returns: | Duration from smallest to largest value in list, in seconds. |
Aggregate function that computes the duration from the smallest to the largest value in the list, returned in seconds.
date_series
(start, stop[, step_seconds=86400])
Parameters: |
|
---|
Table-value function that returns rows consisting of the date/+time values encountered iterating from start to stop, step_seconds
at a time.
Additionally, if start does not have a time component and step_seconds is greater-than-or-equal-to one day (86400 seconds), the values returned will be dates. Conversely, if start does not have a date component, values will be returned as times. Otherwise values are returned as datetimes.
Example:
SELECT * FROM date_series('2017-01-28', '2017-02-02');
value
-----
2017-01-28
2017-01-29
2017-01-30
2017-01-31
2017-02-01
2017-02-02
FILE
file_ext
(filename)
Parameters: | filename (str) – Filename to extract extension from. |
---|---|
Returns: | Returns the file extension, including the leading “.”. |
file_read
(filename)
Parameters: | filename (str) – Filename to read. |
---|---|
Returns: | Contents of the file. |
HELPER
gzip
(data[, compression=9])
Parameters: |
|
---|---|
Returns: | Compressed binary data. |
gunzip
(data)
Parameters: | data (bytes) – Compressed data. |
---|---|
Returns: | Uncompressed binary data. |
hostname
(url)
Parameters: | url (str) – URL to extract hostname from. |
---|---|
Returns: | hostname portion of URL |
toggle
(key)
Parameters: | key – Key to toggle. |
---|
Toggle a key between True/False state. Example:
>>> toggle('my-key')
True
>>> toggle('my-key')
False
>>> toggle('my-key')
True
setting
(key[, value=None])
Parameters: |
|
---|---|
Returns: | Value associated with key. |
Store/retrieve a setting in memory and persist during lifetime of application. To get the current value, only specify the key. To set a new value, call with key and new value.
clear_toggles
()
Clears all state associated with the toggle() function.
clear_settings
()
Clears all state associated with the setting() function.
MATH
randomrange
(start[, stop=None[, step=None]])
Parameters: |
|
---|
Return a random integer between [start, end)
.
gauss_distribution
(mean, sigma)
Parameters: |
|
---|
sqrt
(n)
Calculate the square root of n
.
tonumber
(s)
Parameters: | s (str) – String to convert to number. |
---|---|
Returns: | Integer, floating-point or NULL on failure. |
mode
(val)
Parameters: | val – Numbers in list. |
---|---|
Returns: | The mode, or most-common, number observed. |
Aggregate function which calculates mode of values.
minrange
(val)
Parameters: | val – Value |
---|---|
Returns: | Min difference between two values. |
Aggregate function which calculates the minimal distance between two numbers in the sequence.
avgrange
(val)
Parameters: | val – Value |
---|---|
Returns: | Average difference between values. |
Aggregate function which calculates the average distance between two consecutive numbers in the sequence.
range
(val)
Parameters: | val – Value |
---|---|
Returns: | The range from the smallest to largest value in sequence. |
Aggregate function which returns range of values observed.
median
(val)
Parameters: | val – Value |
---|---|
Returns: | The median, or middle, value in a sequence. |
Aggregate function which calculates the middle value in a sequence.
Note
Only available if you compiled the _sqlite_udf
extension.
STRING
substr_count
(haystack, needle)
Returns number of times needle
appears in haystack
.
strip_chars
(haystack, chars)
Strips any characters in chars
from beginning and end of haystack
.
damerau_levenshtein_dist
(s1, s2)
Computes the edit distance from s1 to s2 using the damerau variant of the levenshtein algorithm.
Note
Only available if you compiled the _sqlite_udf
extension.
levenshtein_dist
(s1, s2)
Computes the edit distance from s1 to s2 using the levenshtein algorithm.
Note
Only available if you compiled the _sqlite_udf
extension.
str_dist
(s1, s2)
Computes the edit distance from s1 to s2 using the standard library SequenceMatcher’s algorithm.
Note
Only available if you compiled the _sqlite_udf
extension.
regex_search
(regex, search_string)
Parameters: |
|
---|
Table-value function that searches a string for substrings that match the provided regex
. Returns rows for each match found.
Example:
SELECT * FROM regex_search('\w+', 'extract words, ignore! symbols');
value
-----
extract
words
ignore
symbols
apsw, an advanced sqlite driver
The apsw_ext
module contains a database class suitable for use with the apsw sqlite driver.
APSW Project page: https://github.com/rogerbinns/apsw
APSW is a really neat library that provides a thin wrapper on top of SQLite’s C interface, making it possible to use all of SQLite’s advanced features.
Here are just a few reasons to use APSW, taken from the documentation:
- APSW gives all functionality of SQLite, including virtual tables, virtual file system, blob i/o, backups and file control.
- Connections can be shared across threads without any additional locking.
- Transactions are managed explicitly by your code.
- APSW can handle nested transactions.
- Unicode is handled correctly.
- APSW is faster.
For more information on the differences between apsw and pysqlite, check the apsw docs.
How to use the APSWDatabase
from apsw_ext import *
db = APSWDatabase(':memory:')
class BaseModel(Model):
class Meta:
database = db
class SomeModel(BaseModel):
col1 = CharField()
col2 = DateTimeField()
apsw_ext API notes
APSWDatabase extends the SqliteExtDatabase and inherits its advanced features.
class APSWDatabase
(database, \*connect_kwargs*)
Parameters: |
|
---|
register_module
(mod_name, mod_inst)Provides a way of globally registering a module. For more information, see the documentation on virtual tables.
Parameters: - mod_name (string) – name to use for module
- mod_inst (object) – an object implementing the Virtual Table interface
unregister_module
(mod_name)Unregister a module.
Parameters: mod_name (string) – name to use for module
Note
Be sure to use the Field
subclasses defined in the apsw_ext
module, as they will properly handle adapting the data types for storage.
For example, instead of using peewee.DateTimeField
, be sure you are importing and using playhouse.apsw_ext.DateTimeField
.
Sqlcipher backend
Note
Although this extention’s code is short, it has not been properly peer-reviewed yet and may have introduced vulnerabilities.
Also note that this code relies on sqlcipher3 (python bindings) and sqlcipher, and the code there might have vulnerabilities as well, but since these are widely used crypto modules, we can expect “short zero days” there.
sqlcipher_ext API notes
class SqlCipherDatabase
(database, passphrase, \*kwargs*)
Subclass of SqliteDatabase that stores the database encrypted. Instead of the standard sqlite3
backend, it uses sqlcipher3: a python wrapper for sqlcipher, which – in turn – is an encrypted wrapper around sqlite3
, so the API is identical to SqliteDatabase’s, except for object construction parameters:
Parameters: |
|
---|
- If the
database
file doesn’t exist, it will be created with encryption by a key derived frompasshprase
. When trying to open an existing database,
passhprase
should be identical to the ones used when it was created. If the passphrase is incorrect, an error will be raised when first attempting to access the database.rekey
(passphrase)Parameters: passphrase (str) – New passphrase for database. Change the passphrase for database.
Note
SQLCipher can be configured using a number of extension PRAGMAs. The list of PRAGMAs and their descriptions can be found in the SQLCipher documentation.
For example to specify the number of PBKDF2 iterations for the key derivation (64K in SQLCipher 3.x, 256K in SQLCipher 4.x by default):
# Use 1,000,000 iterations.
db = SqlCipherDatabase('my_app.db', pragmas={'kdf_iter': 1000000})
To use a cipher page-size of 16KB and a cache-size of 10,000 pages:
db = SqlCipherDatabase('my_app.db', passphrase='secret!!!', pragmas={
'cipher_page_size': 1024 * 16,
'cache_size': 10000}) # 10,000 16KB pages, or 160MB.
Example of prompting the user for a passphrase:
db = SqlCipherDatabase(None)
class BaseModel(Model):
"""Parent for all app's models"""
class Meta:
# We won't have a valid db until user enters passhrase.
database = db
# Derive our model subclasses
class Person(BaseModel):
name = TextField(primary_key=True)
right_passphrase = False
while not right_passphrase:
db.init(
'testsqlcipher.db',
passphrase=get_passphrase_from_user())
try: # Actually execute a query against the db to test passphrase.
db.get_tables()
except DatabaseError as exc:
# This error indicates the password was wrong.
if exc.args[0] == 'file is encrypted or is not a database':
tell_user_the_passphrase_was_wrong()
db.init(None) # Reset the db.
else:
raise exc
else:
# The password was correct.
right_passphrase = True
See also: a slightly more elaborate example.
Postgresql Extensions
The postgresql extensions module provides a number of “postgres-only” functions, currently:
- json support, including jsonb for Postgres 9.4.
- hstore support
- server-side cursors
- full-text search
- ArrayField field type, for storing arrays.
- HStoreField field type, for storing key/value pairs.
- IntervalField field type, for storing
timedelta
objects. - JSONField field type, for storing JSON data.
- BinaryJSONField field type for the
jsonb
JSON data type. - TSVectorField field type, for storing full-text search data.
- DateTimeTZField field type, a timezone-aware datetime field.
In the future I would like to add support for more of postgresql’s features. If there is a particular feature you would like to see added, please open a Github issue.
Warning
In order to start using the features described below, you will need to use the extension PostgresqlExtDatabase class instead of PostgresqlDatabase.
The code below will assume you are using the following database and base model:
from playhouse.postgres_ext import *
ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')
class BaseExtModel(Model):
class Meta:
database = ext_db
JSON Support
peewee has basic support for Postgres’ native JSON data type, in the form of JSONField. As of version 2.4.7, peewee also supports the Postgres 9.4 binary json jsonb
type, via BinaryJSONField.
Warning
Postgres supports a JSON data type natively as of 9.2 (full support in 9.3). In order to use this functionality you must be using the correct version of Postgres with psycopg2 version 2.5 or greater.
To use BinaryJSONField, which has many performance and querying advantages, you must have Postgres 9.4 or later.
Note
You must be sure your database is an instance of PostgresqlExtDatabase in order to use the JSONField.
Here is an example of how you might declare a model with a JSON field:
import json
import urllib2
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('my_database')
class APIResponse(Model):
url = CharField()
response = JSONField()
class Meta:
database = db
@classmethod
def request(cls, url):
fh = urllib2.urlopen(url)
return cls.create(url=url, response=json.loads(fh.read()))
APIResponse.create_table()
# Store a JSON response.
offense = APIResponse.request('http://crime-api.com/api/offense/')
booking = APIResponse.request('http://crime-api.com/api/booking/')
# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
APIResponse.response['meta']['model'] == 'offense')
# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
.select(
APIResponse.data['booking']['person'].as_json().alias('person'))
.where(APIResponse.data['meta']['model'] == 'booking'))
for result in q:
print(result.person['name'], result.person['dob'])
The BinaryJSONField works the same and supports the same operations as the regular JSONField, but provides several additional operations for testing containment. Using the binary json field, you can test whether your JSON data contains other partial JSON structures (contains(), contains_any(), contains_all()), or whether it is a subset of a larger JSON document (contained_by()).
For more examples, see the JSONField and BinaryJSONField API documents below.
hstore support
Postgresql hstore is an embedded key/value store. With hstore, you can store arbitrary key/value pairs in your database alongside structured relational data.
To use hstore
, you need to specify an additional parameter when instantiating your PostgresqlExtDatabase:
# Specify "register_hstore=True":
db = PostgresqlExtDatabase('my_db', register_hstore=True)
Currently the postgres_ext
module supports the following operations:
- Store and retrieve arbitrary dictionaries
- Filter by key(s) or partial dictionary
- Update/add one or more keys to an existing dictionary
- Delete one or more keys from an existing dictionary
- Select keys, values, or zip keys and values
- Retrieve a slice of keys/values
- Test for the existence of a key
- Test that a key has a non-NULL value
Using hstore
To start with, you will need to import the custom database class and the hstore functions from playhouse.postgres_ext
(see above code snippet). Then, it is as simple as adding a HStoreField to your model:
class House(BaseExtModel):
address = CharField()
features = HStoreField()
You can now store arbitrary key/value pairs on House
instances:
>>> h = House.create(
... address='123 Main St',
... features={'garage': '2 cars', 'bath': '2 bath'})
...
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}
You can filter by individual key, multiple keys or partial dictionary:
>>> query = House.select()
>>> garage = query.where(House.features.contains('garage'))
>>> garage_and_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
Suppose you want to do an atomic update to the house:
>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}
Or, alternatively an atomic delete:
>>> query = House.update(features=House.features.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}
Multiple keys can be deleted at the same time:
>>> query = House.update(features=House.features.delete('garage', 'sqft'))
You can select just keys, just values, or zip the two:
>>> for h in House.select(House.address, House.features.keys().alias('keys')):
... print(h.address, h.keys)
123 Main St [u'bath', u'garage']
>>> for h in House.select(House.address, House.features.values().alias('vals')):
... print(h.address, h.vals)
123 Main St [u'2 bath', u'2 cars']
>>> for h in House.select(House.address, House.features.items().alias('mtx')):
... print(h.address, h.mtx)
123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
You can retrieve a slice of data, for example, all the garage data:
>>> query = House.select(House.address, House.features.slice('garage').alias('garage_data'))
>>> for house in query:
... print(house.address, house.garage_data)
123 Main St {'garage': '2 cars'}
You can check for the existence of a key and filter rows accordingly:
>>> has_garage = House.features.exists('garage')
>>> for house in House.select(House.address, has_garage.alias('has_garage')):
... print(house.address, house.has_garage)
123 Main St True
>>> for house in House.select().where(House.features.exists('garage')):
... print(house.address, house.features['garage']) # <-- just houses w/garage data
123 Main St 2 cars
Interval support
Postgres supports durations through the INTERVAL
data-type (docs).
class IntervalField
([null=False[, …]])
Field class capable of storing Python datetime.timedelta
instances.
Example:
from datetime import timedelta
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('my_db')
class Event(Model):
location = CharField()
duration = IntervalField()
start_time = DateTimeField()
class Meta:
database = db
@classmethod
def get_long_meetings(cls):
return cls.select().where(cls.duration > timedelta(hours=1))
Server-side cursors
When psycopg2 executes a query, normally all results are fetched and returned to the client by the backend. This can cause your application to use a lot of memory when making large queries. Using server-side cursors, results are returned a little at a time (by default 2000 records). For the definitive reference, please see the psycopg2 documentation.
Note
To use server-side (or named) cursors, you must be using PostgresqlExtDatabase.
To execute a query using a server-side cursor, simply wrap your select query using the ServerSide() helper:
large_query = PageView.select() # Build query normally.
# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
# do some interesting analysis here.
pass
# Server-side resources are released.
If you would like all SELECT
queries to automatically use a server-side cursor, you can specify this when creating your PostgresqlExtDatabase:
from postgres_ext import PostgresqlExtDatabase
ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)
Note
Server-side cursors live only as long as the transaction, so for this reason peewee will not automatically call commit()
after executing a SELECT
query. If you do not commit
after you are done iterating, you will not release the server-side resources until the connection is closed (or the transaction is committed later). Furthermore, since peewee will by default cache rows returned by the cursor, you should always call .iterator()
when iterating over a large query.
If you are using the ServerSide() helper, the transaction and call to iterator()
will be handled transparently.
Full-text search
Postgresql provides sophisticated full-text search using special data-types (tsvector
and tsquery
). Documents should be stored or converted to the tsvector
type, and search queries should be converted to tsquery
.
For simple cases, you can simply use the Match() function, which will automatically perform the appropriate conversions, and requires no schema changes:
def blog_search(search_term):
return Blog.select().where(
(Blog.status == Blog.STATUS_PUBLISHED) &
Match(Blog.content, search_term))
The Match() function will automatically convert the left-hand operand to a tsvector
, and the right-hand operand to a tsquery
. For better performance, it is recommended you create a GIN
index on the column you plan to search:
CREATE INDEX blog_full_text_search ON blog USING gin(to_tsvector(content));
Alternatively, you can use the TSVectorField to maintain a dedicated column for storing tsvector
data:
class Blog(Model):
content = TextField()
search_content = TSVectorField()
Note
TSVectorField, will automatically be created with a GIN index.
You will need to explicitly convert the incoming text data to tsvector
when inserting or updating the search_content
field:
content = 'Excellent blog post about peewee ORM.'
blog_entry = Blog.create(
content=content,
search_content=fn.to_tsvector(content))
To perform a full-text search, use TSVectorField.match():
terms = 'python & (sqlite | postgres)'
results = Blog.select().where(Blog.search_content.match(terms))
For more information, see the Postgres full-text search docs.
postgres_ext API notes
class PostgresqlExtDatabase
(database[, server_side_cursors=False[, register_hstore=False[, …]]])
Identical to PostgresqlDatabase but required in order to support:
Parameters: |
|
---|
If you wish to use the HStore extension, you must specify register_hstore=True
.
If using server_side_cursors
, also be sure to wrap your queries with ServerSide().
ServerSide
(select_query)
Parameters: | select_query – a SelectQuery instance. |
---|---|
Rtype generator: | |
Wrap the given select query in a transaction, and call its iterator()
method to avoid caching row instances. In order for the server-side resources to be released, be sure to exhaust the generator (iterate over all the rows).
Usage:
large_query = PageView.select()
for page_view in ServerSide(large_query):
# Do something interesting.
pass
# At this point server side resources are released.
class ArrayField
([field_class=IntegerField[, field_kwargs=None[, dimensions=1[, convert_values=False]]]])
Parameters: |
|
---|
Field capable of storing arrays of the provided field_class.
Note
By default ArrayField will use a GIN index. To disable this, initialize the field with index=False
.
You can store and retrieve lists (or lists-of-lists):
class BlogPost(BaseModel):
content = TextField()
tags = ArrayField(CharField)
post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])
Additionally, you can use the __getitem__
API to query values or slices in the database:
# Get the first tag on a given blog post.
first_tag = (BlogPost
.select(BlogPost.tags[0].alias('first_tag'))
.where(BlogPost.id == 1)
.dicts()
.get())
# first_tag = {'first_tag': 'foo'}
Get a slice of values:
# Get the first two tags.
two_tags = (BlogPost
.select(BlogPost.tags[:2].alias('two'))
.dicts()
.get())
# two_tags = {'two': ['foo', 'bar']}
contains
(\items*)Parameters: items – One or more items that must be in the given array field. # Get all blog posts that are tagged with both "python" and "django".
Blog.select().where(Blog.tags.contains('python', 'django'))
contains_any
(\items*)Parameters: items – One or more items to search for in the given array field. Like contains(), except will match rows where the array contains any of the given items.
# Get all blog posts that are tagged with "flask" and/or "django".
Blog.select().where(Blog.tags.contains_any('flask', 'django'))
class DateTimeTZField
(\args, **kwargs*)
A timezone-aware subclass of DateTimeField.
class HStoreField
(\args, **kwargs*)
A field for storing and retrieving arbitrary key/value pairs. For details on usage, see hstore support.
Attention
To use the HStoreField you will need to be sure the hstore extension is registered with the connection. To accomplish this, instantiate the PostgresqlExtDatabase with register_hstore=True
.
Note
By default HStoreField
will use a GiST index. To disable this, initialize the field with index=False
.
keys
()Returns the keys for a given row.
>>> for h in House.select(House.address, House.features.keys().alias('keys')):
... print(h.address, h.keys)
123 Main St [u'bath', u'garage']
values
()Return the values for a given row.
>>> for h in House.select(House.address, House.features.values().alias('vals')):
... print(h.address, h.vals)
123 Main St [u'2 bath', u'2 cars']
items
()Like python’s
dict
, return the keys and values in a list-of-lists:>>> for h in House.select(House.address, House.features.items().alias('mtx')):
... print(h.address, h.mtx)
123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
slice
(\args*)Return a slice of data given a list of keys.
>>> for h in House.select(House.address, House.features.slice('garage').alias('garage_data')):
... print(h.address, h.garage_data)
123 Main St {'garage': '2 cars'}
exists
(key)Query for whether the given key exists.
>>> for h in House.select(House.address, House.features.exists('garage').alias('has_garage')):
... print(h.address, h.has_garage)
123 Main St True
>>> for h in House.select().where(House.features.exists('garage')):
... print(h.address, h.features['garage']) # <-- just houses w/garage data
123 Main St 2 cars
defined
(key)Query for whether the given key has a value associated with it.
update
(\*data*)Perform an atomic update to the keys/values for a given row or rows.
>>> query = House.update(features=House.features.update(
... sqft=2000,
... year_built=2012))
>>> query.where(House.id == 1).execute()
delete
(\keys*)Delete the provided keys for a given row or rows.
Note
We will use an
UPDATE
query.>>> query = House.update(features=House.features.delete(
... 'sqft', 'year_built'))
>>> query.where(House.id == 1).execute()
contains
(value)Parameters: value – Either a dict
, alist
of keys, or a single key.Query rows for the existence of either:
- a partial dictionary.
- a list of keys.
- a single key.
>>> query = House.select()
>>> has_garage = query.where(House.features.contains('garage'))
>>> garage_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
contains_any
(\keys*)Parameters: keys – One or more keys to search for. Query rows for the existence of any key.
class JSONField
(dumps=None, \args, **kwargs*)
Parameters: | dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper. |
---|
Field class suitable for storing and querying arbitrary JSON. When using this on a model, set the field’s value to a Python object (either a dict
or a list
). When you retrieve your value from the database it will be returned as a Python data structure.
Note
You must be using Postgres 9.2 / psycopg2 2.5 or greater.
Note
If you are using Postgres 9.4, strongly consider using the BinaryJSONField instead as it offers better performance and more powerful querying options.
Example model declaration:
db = PostgresqlExtDatabase('my_db')
class APIResponse(Model):
url = CharField()
response = JSONField()
class Meta:
database = db
Example of storing JSON data:
url = 'http://foo.com/api/resource/'
resp = json.loads(urllib2.urlopen(url).read())
APIResponse.create(url=url, response=resp)
APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})
To query, use Python’s []
operators to specify nested key or array lookups:
APIResponse.select().where(
APIResponse.response['key1']['nested-key'] == 'some-value')
To illustrate the use of the []
operators, imagine we have the following data stored in an APIResponse
:
{
"foo": {
"bar": ["i1", "i2", "i3"],
"baz": {
"huey": "mickey",
"peewee": "nugget"
}
}
}
Here are the results of a few queries:
def get_data(expression):
# Helper function to just retrieve the results of a
# particular expression.
query = (APIResponse
.select(expression.alias('my_data'))
.dicts()
.get())
return query['my_data']
# Accessing the foo -> bar subkey will return a JSON
# representation of the list.
get_data(APIResponse.data['foo']['bar'])
# '["i1", "i2", "i3"]'
# In order to retrieve this list as a Python list,
# we will call .as_json() on the expression.
get_data(APIResponse.data['foo']['bar'].as_json())
# ['i1', 'i2', 'i3']
# Similarly, accessing the foo -> baz subkey will
# return a JSON representation of the dictionary.
get_data(APIResponse.data['foo']['baz'])
# '{"huey": "mickey", "peewee": "nugget"}'
# Again, calling .as_json() will return an actual
# python dictionary.
get_data(APIResponse.data['foo']['baz'].as_json())
# {'huey': 'mickey', 'peewee': 'nugget'}
# When dealing with simple values, either way works as
# you expect.
get_data(APIResponse.data['foo']['bar'][0])
# 'i1'
# Calling .as_json() when the result is a simple value
# will return the same thing as the previous example.
get_data(APIResponse.data['foo']['bar'][0].as_json())
# 'i1'
class BinaryJSONField
(dumps=None, \args, **kwargs*)
Parameters: | dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper. |
---|
Store and query arbitrary JSON documents. Data should be stored using normal Python dict
and list
objects, and when data is returned from the database, it will be returned using dict
and list
as well.
For examples of basic query operations, see the above code samples for JSONField. The example queries below will use the same APIResponse
model described above.
Note
By default BinaryJSONField will use a GiST index. To disable this, initialize the field with index=False
.
Note
You must be using Postgres 9.4 / psycopg2 2.5 or newer. If you are using Postgres 9.2 or 9.3, you can use the regular JSONField instead.
contains
(other)Test whether the given JSON data contains the given JSON fragment or key.
Example:
search_fragment = {
'foo': {'bar': ['i2']}
}
query = (APIResponse
.select()
.where(APIResponse.data.contains(search_fragment)))
# If we're searching for a list, the list items do not need to
# be ordered in a particular way:
query = (APIResponse
.select()
.where(APIResponse.data.contains({
'foo': {'bar': ['i2', 'i1']}})))
We can pass in simple keys as well. To find APIResponses that contain the key
foo
at the top-level:APIResponse.select().where(APIResponse.data.contains('foo'))
We can also search sub-keys using square-brackets:
APIResponse.select().where(
APIResponse.data['foo']['bar'].contains(['i2', 'i1']))
contains_any
(\items*)Search for the presence of one or more of the given items.
APIResponse.select().where(
APIResponse.data.contains_any('foo', 'baz', 'nugget'))
Like contains(), we can also search sub-keys:
APIResponse.select().where(
APIResponse.data['foo']['bar'].contains_any('i2', 'ix'))
contains_all
(\items*)Search for the presence of all of the given items.
APIResponse.select().where(
APIResponse.data.contains_all('foo'))
Like contains_any(), we can also search sub-keys:
APIResponse.select().where(
APIResponse.data['foo']['bar'].contains_all('i1', 'i2', 'i3'))
contained_by
(other)Test whether the given JSON document is contained by (is a subset of) the given JSON document. This method is the inverse of contains().
big_doc = {
'foo': {
'bar': ['i1', 'i2', 'i3'],
'baz': {
'huey': 'mickey',
'peewee': 'nugget',
}
},
'other_key': ['nugget', 'bear', 'kitten'],
}
APIResponse.select().where(
APIResponse.data.contained_by(big_doc))
concat
(data)Concatentate two field data and the provided data. Note that this operation does not merge or do a “deep concat”.
has_key
(key)Test whether the key exists at the top-level of the JSON object.
remove
(\keys*)Remove one or more keys from the top-level of the JSON object.
Match
(field, query)
Generate a full-text search expression, automatically converting the left-hand operand to a tsvector
, and the right-hand operand to a tsquery
.
Example:
def blog_search(search_term):
return Blog.select().where(
(Blog.status == Blog.STATUS_PUBLISHED) &
Match(Blog.content, search_term))
class TSVectorField
Field type suitable for storing tsvector
data. This field will automatically be created with a GIN
index for improved search performance.
Note
Data stored in this field will still need to be manually converted to the tsvector
type.
Note
By default TSVectorField will use a GIN index. To disable this, initialize the field with
index=False
.
Example usage:
class Blog(Model):
content = TextField()
search_content = TSVectorField()
content = 'this is a sample blog entry.'
blog_entry = Blog.create(
content=content,
search_content=fn.to_tsvector(content)) # Note `to_tsvector()`.
match
(query[, language=None[, plain=False]])Parameters: - query (str) – the full-text search query.
- language (str) – language name (optional).
- plain (bool) – parse search query using plain (simple) parser.
Returns: an expression representing full-text search/match.
Example:
# Perform a search using the "match" method.
terms = 'python & (sqlite | postgres)'
results = Blog.select().where(Blog.search_content.match(terms))
Cockroach Database
CockroachDB (CRDB) is well supported by peewee.
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app', user='root', host='10.1.0.8')
If you are using Cockroach Cloud, you may find it easier to specify the connection parameters using a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')
Note
CockroachDB requires the psycopg2
(postgres) Python driver.
Note
CockroachDB installation and getting-started guide can be found here: https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html
SSL Configuration
SSL certificates are strongly recommended when running a Cockroach cluster. Psycopg2 supports SSL out-of-the-box, but you may need to specify some additional options when initializing your database:
db = CockroachDatabase(
'my_app',
user='root',
host='10.1.0.8',
sslmode='verify-full', # Verify the cert common-name.
sslrootcert='/path/to/root.crt')
# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
'?sslmode=verify-full&sslrootcert=/path/to/root.crt'
'&options=--cluster=my-cluster-xyz')
More details about client verification can be found on the libpq docs.
Cockroach Extension APIs
The playhouse.cockroachdb
extension module provides the following classes and helpers:
- CockroachDatabase - a subclass of PostgresqlDatabase, designed specifically for working with CRDB.
- PooledCockroachDatabase - like the above, but implements connection-pooling.
- run_transaction() - runs a function inside a transaction and provides automatic client-side retry logic.
Special field-types that may be useful when using CRDB:
- UUIDKeyField - a primary-key field implementation that uses CRDB’s
UUID
type with a default randomly-generated UUID. - RowIDField - a primary-key field implementation that uses CRDB’s
INT
type with a defaultunique_rowid()
. - JSONField - same as the Postgres BinaryJSONField, as CRDB treats JSON as JSONB.
- ArrayField - same as the Postgres extension (but does not support multi-dimensional arrays).
CRDB is compatible with Postgres’ wire protocol and exposes a very similar SQL interface, so it is possible (though not recommended) to use PostgresqlDatabase with CRDB:
- CRDB does not support nested transactions (savepoints), so the atomic() method has been implemented to enforce this when using CockroachDatabase. For more info CRDB Transactions.
- CRDB may have subtle differences in field-types, date functions and introspection from Postgres.
- CRDB-specific features are exposed by the CockroachDatabase, such as specifying a transaction priority or the
AS OF SYSTEM TIME
clause.
CRDB Transactions
CRDB does not support nested transactions (savepoints), so the atomic() method on the CockroachDatabase has been modified to raise an exception if an invalid nesting is encountered. If you would like to be able to nest transactional code, you can use the transaction() method, which will ensure that the outer-most block will manage the transaction (e.g., exiting a nested-block will not cause an early commit).
Example:
@db.transaction()
def create_user(username):
return User.create(username=username)
def some_other_function():
with db.transaction() as txn:
# do some stuff...
# This function is wrapped in a transaction, but the nested
# transaction will be ignored and folded into the outer
# transaction, as we are already in a wrapped-block (via the
# context manager).
create_user('some_user@example.com')
# do other stuff.
# At this point we have exited the outer-most block and the transaction
# will be committed.
return
CRDB provides client-side transaction retries, which are available using a special run_transaction() helper. This helper method accepts a callable, which is responsible for executing any transactional statements that may need to be retried.
Simplest possible example of run_transaction():
def create_user(email):
# Callable that accepts a single argument (the database instance) and
# which is responsible for executing the transactional SQL.
def callback(db_ref):
return User.create(email=email)
return db.run_transaction(callback, max_attempts=10)
huey = create_user('huey@example.com')
Note
The cockroachdb.ExceededMaxAttempts
exception will be raised if the transaction cannot be committed after the given number of attempts. If the SQL is mal-formed, violates a constraint, etc., then the function will raise the exception to the caller.
Example of using run_transaction() to implement client-side retries for a transaction that transfers an amount from one account to another:
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app')
def transfer_funds(from_id, to_id, amt):
"""
Returns a 3-tuple of (success?, from balance, to balance). If there are
not sufficient funds, then the original balances are returned.
"""
def thunk(db_ref):
src, dest = (Account
.select()
.where(Account.id.in_([from_id, to_id])))
if src.id != from_id:
src, dest = dest, src # Swap order.
# Cannot perform transfer, insufficient funds!
if src.balance < amt:
return False, src.balance, dest.balance
# Update each account, returning the new balance.
src, = (Account
.update(balance=Account.balance - amt)
.where(Account.id == from_id)
.returning(Account.balance)
.execute())
dest, = (Account
.update(balance=Account.balance + amt)
.where(Account.id == to_id)
.returning(Account.balance)
.execute())
return True, src.balance, dest.balance
# Perform the queries that comprise a logical transaction. In the
# event the transaction fails due to contention, it will be auto-
# matically retried (up to 10 times).
return db.run_transaction(thunk, max_attempts=10)
CRDB APIs
class CockroachDatabase
(database[, \*kwargs*])
CockroachDB implementation, based on the PostgresqlDatabase and using the psycopg2
driver.
Additional keyword arguments are passed to the psycopg2 connection constructor, and may be used to specify the database user
, port
, etc.
Alternatively, the connection details can be specified in URL-form.
run_transaction
(callback[, max_attempts=None[, system_time=None[, priority=None]]])Parameters: - callback – callable that accepts a single
db
parameter (which will be the database instance this method is called from). - max_attempts (int) – max number of times to try before giving up.
- system_time (datetime) – execute the transaction
AS OF SYSTEM TIME
with respect to the given value. - priority (str) – either “low”, “normal” or “high”.
Returns: returns the value returned by the callback.
Raises: ExceededMaxAttempts
ifmax_attempts
is exceeded.Run SQL in a transaction with automatic client-side retries.
User-provided
callback
:- Must accept one parameter, the
db
instance representing the connection the transaction is running under. - Must not attempt to commit, rollback or otherwise manage the transaction.
- May be called more than one time.
- Should ideally only contain SQL operations.
Additionally, the database must not have any open transactions at the time this function is called, as CRDB does not support nested transactions. Attempting to do so will raise a
NotImplementedError
.Simplest possible example:
def create_user(email):
def callback(db_ref):
return User.create(email=email)
return db.run_transaction(callback, max_attempts=10)
user = create_user('huey@example.com')
- callback – callable that accepts a single
class PooledCockroachDatabase
(database[, \*kwargs*])
CockroachDB connection-pooling implementation, based on PooledPostgresqlDatabase. Implements the same APIs as CockroachDatabase, but will do client-side connection pooling.
run_transaction
(db, callback[, max_attempts=None[, system_time=None[, priority=None]]])
Run SQL in a transaction with automatic client-side retries. See CockroachDatabase.run_transaction() for details.
Parameters: |
|
---|
Note
This function is equivalent to the identically-named method on the CockroachDatabase class.
class UUIDKeyField
UUID primary-key field that uses the CRDB gen_random_uuid()
function to automatically populate the initial value.
class RowIDField
Auto-incrementing integer primary-key field that uses the CRDB unique_rowid()
function to automatically populate the initial value.
See also:
- BinaryJSONField from the Postgresql extension (available in the
cockroachdb
extension module, and aliased toJSONField
). - ArrayField from the Postgresql extension.
MySQL Extensions
Peewee provides an alternate database implementation for using the mysql-connector driver or the mariadb-connector. The implementations can be found in playhouse.mysql_ext
.
Example usage of mysql-connector:
from playhouse.mysql_ext import MySQLConnectorDatabase
# MySQL database implementation that utilizes mysql-connector driver.
db = MySQLConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
Example usage of mariadb-connector:
from playhouse.mysql_ext import MariaDBConnectorDatabase
# MySQL database implementation that utilizes mysql-connector driver.
db = MariaDBConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
Note
The MariaDBConnectorDatabase
does not accept the following parameters:
charset
(it is always utf8mb4)sql_mode
use_unicode
Additional MySQL-specific helpers:
class JSONField
Extends TextField and implements transparent JSON encoding and decoding in Python.
Match
(columns, expr[, modifier=None])
Parameters: |
|
---|
Helper class for constructing MySQL full-text search queries of the form:
MATCH (columns, ...) AGAINST (expr[ modifier])
DataSet
The dataset module contains a high-level API for working with databases modeled after the popular project of the same name. The aims of the dataset module are to provide:
- A simplified API for working with relational data, along the lines of working with JSON.
- An easy way to export relational data as JSON or CSV.
- An easy way to import JSON or CSV data into a relational database.
A minimal data-loading script might look like this:
from playhouse.dataset import DataSet
db = DataSet('sqlite:///:memory:')
table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')
huey = table.find_one(name='Huey')
print(huey)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
for obj in table:
print(obj)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}
You can insert, update or delete using the dictionary APIs as well:
huey = table.find_one(name='Huey')
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# Perform an update by supplying a partial record of changes.
table[1] = {'gender': 'male', 'age': 4}
print(table[1])
# {'age': 4, 'gender': 'male', 'id': 1, 'name': 'Huey'}
# Or insert a new record:
table[3] = {'name': 'Zaizee', 'age': 2}
print(table[3])
# {'age': 2, 'gender': None, 'id': 3, 'name': 'Zaizee'}
# Or delete a record:
del table[3] # Remove the row we just added.
You can export or import data using freeze() and thaw():
# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')
# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')
Getting started
DataSet objects are initialized by passing in a database URL of the format dialect://user:password@host/dbname
. See the Database URL section for examples of connecting to various databases.
# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')
Storing data
To store data, we must first obtain a reference to a table. If the table does not exist, it will be created automatically:
# Get a table reference, creating the table if it does not exist.
table = db['users']
We can now insert() new rows into the table. If the columns do not exist, they will be created automatically:
table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')
To update existing entries in the table, pass in a dictionary containing the new values and filter conditions. The list of columns to use as filters is specified in the columns argument. If no filter columns are specified, then all rows will be updated.
# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])
# Update all records. If the column does not exist, it will be created.
table.update(favorite_orm='peewee')
Importing data
To import data from an external source, such as a JSON or CSV file, you can use the thaw() method. By default, new columns will be created for any attributes encountered. If you wish to only populate columns that are already defined on a table, you can pass in strict=True
.
# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')
table.all()[:3]
# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
{'id': 2, 'ticker': 'AAPL', 'price': 109},
{'id': 3, 'ticker': 'AMZN', 'price': 300}]
Using transactions
DataSet supports nesting transactions using a simple context manager.
table = db['users']
with db.transaction() as txn:
table.insert(name='Charlie')
with db.transaction() as nested_txn:
# Set Charlie's favorite ORM to Django.
table.update(name='Charlie', favorite_orm='django', columns=['name'])
# jk/lol
nested_txn.rollback()
Inspecting the database
You can use the tables()
method to list the tables in the current database:
>>> print(db.tables)
['sometable', 'user']
And for a given table, you can print the columns:
>>> table = db['user']
>>> print(table.columns)
['id', 'age', 'name', 'gender', 'favorite_orm']
We can also find out how many rows are in a table:
>>> print(len(db['user']))
3
Reading data
To retrieve all rows, you can use the all() method:
# Retrieve all the users.
users = db['user'].all()
# We can iterate over all rows without calling `.all()`
for user in db['user']:
print(user['name'])
Specific objects can be retrieved using find() and find_one().
# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')
# Find Huey.
huey = db['user'].find_one(name='Huey')
Exporting data
To export data, use the freeze() method, passing in the query you wish to export:
peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')
API
class DataSet
(url, \*kwargs*)
Parameters: |
|
---|
The DataSet class provides a high-level API for working with relational databases.
tables
Return a list of tables stored in the database. This list is computed dynamically each time it is accessed.
__getitem__
(table_name)Provide a Table reference to the specified table. If the table does not exist, it will be created.
query
(sql[, params=None[, commit=True]])Parameters: - sql (str) – A SQL query.
- params (list) – Optional parameters for the query.
- commit (bool) – Whether the query should be committed upon execution.
Returns: A database cursor.
Execute the provided query against the database.
transaction
()Create a context manager representing a new transaction (or savepoint).
freeze
(query[, format=’csv’[, filename=None[, file_obj=None[, encoding=’utf8’[, \*kwargs*]]]]])Parameters: - query – A SelectQuery, generated using all() or ~Table.find.
- format – Output format. By default, csv and json are supported.
- filename – Filename to write output to.
- file_obj – File-like object to write output to.
- encoding (str) – File encoding.
- kwargs – Arbitrary parameters for export-specific functionality.
thaw
(table[, format=’csv’[, filename=None[, file_obj=None[, strict=False[, encoding=’utf8’[, \*kwargs*]]]]]])Parameters: - table (str) – The name of the table to load data into.
- format – Input format. By default, csv and json are supported.
- filename – Filename to read data from.
- file_obj – File-like object to read data from.
- strict (bool) – Whether to store values for columns that do not already exist on the table.
- encoding (str) – File encoding.
- kwargs – Arbitrary parameters for import-specific functionality.
connect
()Open a connection to the underlying database. If a connection is not opened explicitly, one will be opened the first time a query is executed.
close
()Close the connection to the underlying database.
class Table
(dataset, name, model_class)
Noindex: |
---|
Provides a high-level API for working with rows in a given table.
columns
Return a list of columns in the given table.
model_class
A dynamically-created Model class.
create_index
(columns[, unique=False])Create an index on the given columns:
# Create a unique index on the `username` column.
db['users'].create_index(['username'], unique=True)
insert
(\*data*)Insert the given data dictionary into the table, creating new columns as needed.
update
(columns=None, conjunction=None, \*data*)Update the table using the provided data. If one or more columns are specified in the columns parameter, then those columns’ values in the data dictionary will be used to determine which rows to update.
# Update all rows.
db['users'].update(favorite_orm='peewee')
# Only update Huey's record, setting his age to 3.
db['users'].update(name='Huey', age=3, columns=['name'])
find
(\*query*)Query the table for rows matching the specified equality conditions. If no query is specified, then all rows are returned.
peewee_users = db['users'].find(favorite_orm='peewee')
find_one
(\*query*)Return a single row matching the specified equality conditions. If no matching row is found then
None
will be returned.huey = db['users'].find_one(name='Huey')
all
()Return all rows in the given table.
delete
(\*query*)Delete all rows matching the given equality conditions. If no query is provided, then all rows will be deleted.
# Adios, Django!
db['users'].delete(favorite_orm='Django')
# Delete all the secret messages.
db['secret_messages'].delete()
freeze
([format=’csv’[, filename=None[, file_obj=None[, \*kwargs*]]]])Parameters: - format – Output format. By default, csv and json are supported.
- filename – Filename to write output to.
- file_obj – File-like object to write output to.
- kwargs – Arbitrary parameters for export-specific functionality.
thaw
([format=’csv’[, filename=None[, file_obj=None[, strict=False[, \*kwargs*]]]]])Parameters: - format – Input format. By default, csv and json are supported.
- filename – Filename to read data from.
- file_obj – File-like object to read data from.
- strict (bool) – Whether to store values for columns that do not already exist on the table.
- kwargs – Arbitrary parameters for import-specific functionality.
Fields
These fields can be found in the playhouse.fields
module.
class CompressedField
([compression_level=6[, algorithm=’zlib’[, \*kwargs*]]])
Parameters: |
|
---|
Stores compressed data using the specified algorithm. This field extends BlobField, transparently storing a compressed representation of the data in the database.
class PickleField
Stores arbitrary Python data by transparently pickling and un-pickling data stored in the field. This field extends BlobField. If the cPickle
module is available, it will be used.
Hybrid Attributes
Hybrid attributes encapsulate functionality that operates at both the Python and SQL levels. The idea for hybrid attributes comes from a feature of the same name in SQLAlchemy. Consider the following example:
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_method
def contains(self, point):
return (self.start <= point) & (point < self.end)
The hybrid attribute gets its name from the fact that the length
attribute will behave differently depending on whether it is accessed via the Interval
class or an Interval
instance.
If accessed via an instance, then it behaves just as you would expect.
If accessed via the Interval.length
class attribute, however, the length calculation will be expressed as a SQL expression. For example:
query = Interval.select().where(Interval.length > 5)
This query will be equivalent to the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)
The playhouse.hybrid
module also contains a decorator for implementing hybrid methods which can accept parameters. As with hybrid properties, when accessed via a model instance, then the function executes normally as-written. When the hybrid method is called on the class, however, it will generate a SQL expression.
Example:
query = Interval.select().where(Interval.contains(2))
This query is equivalent to the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
There is an additional API for situations where the python implementation differs slightly from the SQL implementation. Let’s add a radius
method to the Interval
model. Because this method calculates an absolute value, we will use the Python abs()
function for the instance portion and the fn.ABS()
SQL function for the class portion.
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_property
def radius(self):
return abs(self.length) / 2
@radius.expression
def radius(cls):
return fn.ABS(cls.length) / 2
What is neat is that both the radius
implementations refer to the length
hybrid attribute! When accessed via an Interval
instance, the radius calculation will be executed in Python. When invoked via an Interval
class, we will get the appropriate SQL.
Example:
query = Interval.select().where(Interval.radius < 3)
This query is equivalent to the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)
Pretty neat, right? Thanks for the cool idea, SQLAlchemy!
Hybrid API
class hybrid_method
(func[, expr=None])
Method decorator that allows the definition of a Python object method with both instance-level and class-level behavior.
Example:
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_method
def contains(self, point):
return (self.start <= point) & (point < self.end)
When called with an Interval
instance, the contains
method will behave as you would expect. When called as a classmethod, though, a SQL expression will be generated:
query = Interval.select().where(Interval.contains(2))
Would generate the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
expression
(expr)Method decorator for specifying the SQL-expression producing method.
class hybrid_property
(fget[, fset=None[, fdel=None[, expr=None]]])
Method decorator that allows the definition of a Python object property with both instance-level and class-level behavior.
Examples:
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_property
def radius(self):
return abs(self.length) / 2
@radius.expression
def radius(cls):
return fn.ABS(cls.length) / 2
When accessed on an Interval
instance, the length
and radius
properties will behave as you would expect. When accessed as class attributes, though, a SQL expression will be generated instead:
query = (Interval
.select()
.where(
(Interval.length > 6) &
(Interval.radius >= 3)))
Would generate the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (
(("t1"."end" - "t1"."start") > 6) AND
((abs("t1"."end" - "t1"."start") / 2) >= 3)
)
Key/Value Store
The playhouse.kv
module contains the implementation of a persistent dictionary.
class KeyValue
([key_field=None[, value_field=None[, ordered=False[, database=None[, table_name=’keyvalue’]]]]])
Parameters: |
|
---|
Dictionary-like API for storing key/value data. Like dictionaries, supports the expected APIs, but also has the added capability of accepting expressions for getting, setting and deleting items.
Table is created automatically (if it doesn’t exist) when the KeyValue
is instantiated.
Uses efficient upsert implementation for setting and updating/overwriting key/value pairs.
Basic examples:
# Create a key/value store, which uses an in-memory SQLite database
# for data storage.
KV = KeyValue()
# Set (or overwrite) the value for "k1".
KV['k1'] = 'v1'
# Set (or update) multiple keys at once (uses an efficient upsert).
KV.update(k2='v2', k3='v3')
# Getting values works as you'd expect.
assert KV['k2'] == 'v2'
# We can also do this:
for value in KV[KV.key > 'k1']:
print(value)
# 'v2'
# 'v3'
# Update multiple values at once using expression:
KV[KV.key > 'k1'] = 'vx'
# What's stored in the KV?
print(dict(KV))
# {'k1': 'v1', 'k2': 'vx', 'k3': 'vx'}
# Delete a single item.
del KV['k2']
# How many items are stored in the KV?
print(len(KV))
# 2
# Delete items that match the given condition.
del KV[KV.key > 'k1']
__contains__
(expr)Parameters: expr – a single key or an expression Returns: Boolean whether key/expression exists. Example:
>>> kv = KeyValue()
>>> kv.update(k1='v1', k2='v2')
>>> 'k1' in kv
True
>>> 'kx' in kv
False
>>> (KV.key < 'k2') in KV
True
>>> (KV.key > 'k2') in KV
False
__len__
()Returns: Count of items stored. __getitem__
(expr)Parameters: expr – a single key or an expression. Returns: value(s) corresponding to key/expression. Raises: KeyError
if single key given and not found.Examples:
>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')
>>> KV['k1']
'v1'
>>> KV['kx']
KeyError: "kx" not found
>>> KV[KV.key > 'k1']
['v2', 'v3']
>>> KV[KV.key < 'k1']
[]
__setitem__
(expr, value)Parameters: - expr – a single key or an expression.
- value – value to set for key(s)
Set value for the given key. If
expr
is an expression, then any keys matching the expression will have their value updated.Example:
>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')
>>> KV['k1'] = 'v1-x'
>>> print(KV['k1'])
'v1-x'
>>> KV[KV.key >= 'k2'] = 'v99'
>>> dict(KV)
{'k1': 'v1-x', 'k2': 'v99', 'k3': 'v99'}
__delitem__
(expr)Parameters: expr – a single key or an expression. Delete the given key. If an expression is given, delete all keys that match the expression.
Example:
>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2, k3=3)
>>> del KV['k1'] # Deletes "k1".
>>> del KV['k1']
KeyError: "k1" does not exist
>>> del KV[KV.key > 'k2'] # Deletes "k3".
>>> del KV[KV.key > 'k99'] # Nothing deleted, no keys match.
keys
()Returns: an iterable of all keys in the table. values
()Returns: an iterable of all values in the table. items
()Returns: an iterable of all key/value pairs in the table. update
([__data=None[, \*mapping*]])Efficiently bulk-insert or replace the given key/value pairs.
Example:
>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2) # Sets 'k1'=1, 'k2'=2.
>>> dict(KV)
{'k1': 1, 'k2': 2}
>>> KV.update(k2=22, k3=3) # Updates 'k2'->22, sets 'k3'=3.
>>> dict(KV)
{'k1': 1, 'k2': 22, 'k3': 3}
>>> KV.update({'k2': -2, 'k4': 4}) # Also can pass a dictionary.
>>> dict(KV)
{'k1': 1, 'k2': -2, 'k3': 3, 'k4': 4}
get
(expr[, default=None])Parameters: - expr – a single key or an expression.
- default – default value if key not found.
Returns: value of given key/expr or default if single key not found.
Get the value at the given key. If the key does not exist, the default value is returned, unless the key is an expression in which case an empty list will be returned.
pop
(expr[, default=Sentinel])Parameters: - expr – a single key or an expression.
- default – default value if key does not exist.
Returns: value of given key/expr or default if single key not found.
Get value and delete the given key. If the key does not exist, the default value is returned, unless the key is an expression in which case an empty list is returned.
clear
()Remove all items from the key-value table.
Shortcuts
This module contains helper functions for expressing things that would otherwise be somewhat verbose or cumbersome using peewee’s APIs. There are also helpers for serializing models to dictionaries and vice-versa.
model_to_dict
(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None[, max_depth=None[, manytomany=False]]]]]]]])
Parameters: |
|
---|
Convert a model instance (and optionally any related instances) to a dictionary.
Examples:
>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}
>>> model_to_dict(user, backrefs=True)
{'id': 1, 'tweets': [], 'username': 'charlie'}
>>> t1 = Tweet.create(user=user, message='tweet-1')
>>> t2 = Tweet.create(user=user, message='tweet-2')
>>> model_to_dict(user, backrefs=True)
{
'id': 1,
'tweets': [
{'id': 1, 'message': 'tweet-1'},
{'id': 2, 'message': 'tweet-2'},
],
'username': 'charlie'
}
>>> model_to_dict(t1)
{
'id': 1,
'message': 'tweet-1',
'user': {
'id': 1,
'username': 'charlie'
}
}
>>> model_to_dict(t2, recurse=False)
{'id': 1, 'message': 'tweet-2', 'user': 1}
The implementation of model_to_dict
is fairly complex, owing to the various usages it attempts to support. If you have a special usage, I strongly advise that you do not attempt to shoe-horn some crazy combination of parameters into this function. Just write a simple function that accomplishes exactly what you’re attempting to do.
dict_to_model
(model_class, data[, ignore_unknown=False])
Parameters: |
|
---|
Convert a dictionary of data to a model instance, creating related instances where appropriate.
Examples:
>>> user_data = {'id': 1, 'username': 'charlie'}
>>> user = dict_to_model(User, user_data)
>>> user
<__main__.User at 0x7fea8fa4d490>
>>> user.username
'charlie'
>>> note_data = {'id': 2, 'text': 'note text', 'user': user_data}
>>> note = dict_to_model(Note, note_data)
>>> note.text
'note text'
>>> note.user.username
'charlie'
>>> user_with_notes = {
... 'id': 1,
... 'username': 'charlie',
... 'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]}
>>> user = dict_to_model(User, user_with_notes)
>>> user.notes[0].text
'note-1'
>>> user.notes[0].user.username
'charlie'
update_model_from_dict
(instance, data[, ignore_unknown=False])
Parameters: |
|
---|
Update a model instance with the given data dictionary.
resolve_multimodel_query
(query[, key=’_model_identifier’])
Parameters: |
|
---|---|
Returns: | an iteratable cursor that yields the proper model instance for each row selected in the compound select query. |
Helper for resolving rows returned in a compound select query to the correct model instance type. For example, if you have a union of two different tables, this helper will resolve each row to the proper model when iterating over the query results.
class ThreadSafeDatabaseMetadata
Model Metadata implementation that provides thread-safe access to the database
attribute, allowing applications to swap the database at run-time safely in a multi-threaded application.
Usage:
from playhouse.shortcuts import ThreadSafeDatabaseMetadata
# Our multi-threaded application will sometimes swap out the primary
# for the read-replica at run-time.
primary = PostgresqlDatabase(...)
read_replica = PostgresqlDatabase(...)
class BaseModel(Model):
class Meta:
database = primary
model_metadata_class = ThreadSafeDatabaseMetadata
Signal support
Models with hooks for signals (a-la django) are provided in playhouse.signals
. To use the signals, you will need all of your project’s models to be a subclass of playhouse.signals.Model
, which overrides the necessary methods to provide support for the various signals.
from playhouse.signals import Model, post_save
class MyModel(Model):
data = IntegerField()
@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
put_data_in_cache(instance.data)
Warning
For what I hope are obvious reasons, Peewee signals do not work when you use the Model.insert(), Model.update(), or Model.delete() methods. These methods generate queries that execute beyond the scope of the ORM, and the ORM does not know about which model instances might or might not be affected when the query executes.
Signals work by hooking into the higher-level peewee APIs like Model.save() and Model.delete_instance(), where the affected model instance is known ahead of time.
The following signals are provided:
pre_save
Called immediately before an object is saved to the database. Provides an additional keyword argument created
, indicating whether the model is being saved for the first time or updated.
post_save
Called immediately after an object is saved to the database. Provides an additional keyword argument created
, indicating whether the model is being saved for the first time or updated.
pre_delete
Called immediately before an object is deleted from the database when Model.delete_instance() is used.
post_delete
Called immediately after an object is deleted from the database when Model.delete_instance() is used.
pre_init
Called when a model class is first instantiated
Connecting handlers
Whenever a signal is dispatched, it will call any handlers that have been registered. This allows totally separate code to respond to events like model save and delete.
The Signal class provides a connect() method, which takes a callback function and two optional parameters for “sender” and “name”. If specified, the “sender” parameter should be a single model class and allows your callback to only receive signals from that one model class. The “name” parameter is used as a convenient alias in the event you wish to unregister your signal handler.
Example usage:
from playhouse.signals import *
def post_save_handler(sender, instance, created):
print('%s was just saved' % instance)
# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)
All signal handlers accept as their first two arguments sender
and instance
, where sender
is the model class and instance
is the actual model being acted upon.
If you’d like, you can also use a decorator to connect signal handlers. This is functionally equivalent to the above example:
@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
print('%s was just saved' % instance)
Signal API
class Signal
Stores a list of receivers (callbacks) and calls them when the “send” method is invoked.
connect
(receiver[, sender=None[, name=None]])Parameters: - receiver (callable) – a callable that takes at least two parameters, a “sender”, which is the Model subclass that triggered the signal, and an “instance”, which is the actual model instance.
- sender (Model) – if specified, only instances of this model class will trigger the receiver callback.
- name (string) – a short alias
Add the receiver to the internal list of receivers, which will be called whenever the signal is sent.
from playhouse.signals import post_save
from project.handlers import cache_buster
post_save.connect(cache_buster, name='project.cache_buster')
disconnect
([receiver=None[, name=None]])Parameters: - receiver (callable) – the callback to disconnect
- name (string) – a short alias
Disconnect the given receiver (or the receiver with the given name alias) so that it no longer is called. Either the receiver or the name must be provided.
post_save.disconnect(name='project.cache_buster')
send
(instance, \args, **kwargs*)Parameters: instance – a model instance Iterates over the receivers and will call them in the order in which they were connected. If the receiver specified a sender, it will only be called if the instance is an instance of the sender.
pwiz, a model generator
pwiz
is a little script that ships with peewee and is capable of introspecting an existing database and generating model code suitable for interacting with the underlying data. If you have a database already, pwiz can give you a nice boost by generating skeleton code with correct column affinities and foreign keys.
If you install peewee using setup.py install
, pwiz will be installed as a “script” and you can just run:
python -m pwiz -e postgresql -u postgres my_postgres_db
This will print a bunch of models to standard output. So you can do this:
python -m pwiz -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print([blog.name for blog in Blog.select()])
Command-line options
pwiz accepts the following command-line options:
Option | Meaning | Example |
---|---|---|
-h | show help | |
-e | database backend | -e mysql |
-H | host to connect to | -H remote.db.server |
-p | port to connect on | -p 9001 |
-u | database user | -u postgres |
-P | database password | -P (will be prompted for password) |
-s | schema | -s public |
-t | tables to generate | -t tweet,users,relationships |
-v | generate models for VIEWs | (no argument) |
-i | add info metadata to generated file | (no argument) |
-o | table column order is preserved | (no argument) |
The following are valid parameters for the engine
(-e
):
- sqlite
- mysql
- postgresql
Warning
If a password is required to access your database, you will be prompted to enter it using a secure prompt.
The password will be included in the output. Specifically, at the top of the file a Database will be defined along with any required parameters – including the password.
pwiz examples
Examples of introspecting various databases:
# Introspect a Sqlite database.
python -m pwiz -e sqlite path/to/sqlite_database.db
# Introspect a MySQL database, logging in as root. You will be prompted
# for a password ("-P").
python -m pwiz -e mysql -u root -P mysql_db_name
# Introspect a Postgresql database on a remote server.
python -m pwiz -e postgres -u postgres -H 10.1.0.3 pg_db_name
Full example:
$ sqlite3 example.db << EOM
CREATE TABLE "user" ("id" INTEGER NOT NULL PRIMARY KEY, "username" TEXT NOT NULL);
CREATE TABLE "tweet" (
"id" INTEGER NOT NULL PRIMARY KEY,
"content" TEXT NOT NULL,
"timestamp" DATETIME NOT NULL,
"user_id" INTEGER NOT NULL,
FOREIGN KEY ("user_id") REFERENCES "user" ("id"));
CREATE UNIQUE INDEX "user_username" ON "user" ("username");
EOM
$ python -m pwiz -e sqlite example.db
Produces the following output:
from peewee import *
database = SqliteDatabase('example.db', **{})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class User(BaseModel):
username = TextField(unique=True)
class Meta:
table_name = 'user'
class Tweet(BaseModel):
content = TextField()
timestamp = DateTimeField()
user = ForeignKeyField(column_name='user_id', field='id', model=User)
class Meta:
table_name = 'tweet'
Observations:
- The foreign-key
Tweet.user_id
is detected and mapped correctly. - The
User.username
UNIQUE constraint is detected. - Each model explicitly declares its table name, even in cases where it is not necessary (as Peewee would automatically translate the class name into the appropriate table name).
- All the parameters of the ForeignKeyField are explicitly declared, even though they follow the conventions Peewee uses by default.
Note
The UnknownField
is a placeholder that is used in the event your schema contains a column declaration that Peewee doesn’t know how to map to a field class.
Schema Migrations
Peewee now supports schema migrations, with well-tested support for Postgresql, SQLite and MySQL. Unlike other schema migration tools, peewee’s migrations do not handle introspection and database “versioning”. Rather, peewee provides a number of helper functions for generating and running schema-altering statements. This engine provides the basis on which a more sophisticated tool could some day be built.
Migrations can be written as simple python scripts and executed from the command-line. Since the migrations only depend on your applications Database object, it should be easy to manage changing your model definitions and maintaining a set of migration scripts without introducing dependencies.
Example usage
Begin by importing the helpers from the migrate module:
from playhouse.migrate import *
Instantiate a migrator
. The SchemaMigrator class is responsible for generating schema altering operations, which can then be run sequentially by the migrate()
helper.
# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)
# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)
Use migrate()
to execute one or more operations:
title_field = CharField(default='')
status_field = IntegerField(null=True)
migrate(
migrator.add_column('some_table', 'title', title_field),
migrator.add_column('some_table', 'status', status_field),
migrator.drop_column('some_table', 'old_column'),
)
Warning
Migrations are not run inside a transaction. If you wish the migration to run in a transaction you will need to wrap the call to migrate in a atomic() context-manager, e.g.
with my_db.atomic():
migrate(...)
Supported Operations
Add new field(s) to an existing model:
# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')
# Run the migration, specifying the database table, field name and field.
migrate(
migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
migrator.add_column('comment_tbl', 'comment', comment_field),
)
Renaming a field:
# Specify the table, original name of the column, and its new name.
migrate(
migrator.rename_column('story', 'pub_date', 'publish_date'),
migrator.rename_column('story', 'mod_date', 'modified_date'),
)
Dropping a field:
migrate(
migrator.drop_column('story', 'some_old_field'),
)
Making a field nullable or not nullable:
# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
# Make `pub_date` allow NULL values.
migrator.drop_not_null('story', 'pub_date'),
# Prevent `modified_date` from containing NULL values.
migrator.add_not_null('story', 'modified_date'),
)
Altering a field’s data-type:
# Change a VARCHAR(50) field to a TEXT field.
migrate(
migrator.alter_column_type('person', 'email', TextField())
)
Renaming a table:
migrate(
migrator.rename_table('story', 'stories_tbl'),
)
Adding an index:
# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
# Create an index on the `pub_date` column.
migrator.add_index('story', ('pub_date',), False),
# Create a multi-column index on the `pub_date` and `status` fields.
migrator.add_index('story', ('pub_date', 'status'), False),
# Create a unique index on the category and title fields.
migrator.add_index('story', ('category_id', 'title'), True),
)
Dropping an index:
# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))
Adding or dropping table constraints:
# Add a CHECK() constraint to enforce the price cannot be negative.
migrate(migrator.add_constraint(
'products',
'price_check',
Check('price >= 0')))
# Remove the price check constraint.
migrate(migrator.drop_constraint('products', 'price_check'))
# Add a UNIQUE constraint on the first and last names.
migrate(migrator.add_unique('person', 'first_name', 'last_name'))
Note
Postgres users may need to set the search-path when using a non-standard schema. This can be done as follows:
new_field = TextField(default='', null=False)
migrator = PostgresqlMigrator(db)
migrate(migrator.set_search_path('my_schema_name'),
migrator.add_column('table', 'field_name', new_field))
Migrations API
migrate
(\operations*)
Execute one or more schema altering operations.
Usage:
migrate(
migrator.add_column('some_table', 'new_column', CharField(default='')),
migrator.create_index('some_table', ('new_column',)),
)
class SchemaMigrator
(database)
Parameters: | database – a Database instance. |
---|
The SchemaMigrator is responsible for generating schema-altering statements.
add_column
(table, column_name, field)Parameters: Add a new column to the provided table. The
field
provided will be used to generate the appropriate column definition.Note
If the field is not nullable it must specify a default value.
Note
For non-null fields, the field will initially be added as a null field, then an
UPDATE
statement will be executed to populate the column with the default value. Finally, the column will be marked as not null.drop_column
(table, column_name[, cascade=True])Parameters: - table (str) – Name of the table to drop column from.
- column_name (str) – Name of the column to drop.
- cascade (bool) – Whether the column should be dropped with CASCADE.
rename_column
(table, old_name, new_name)Parameters: - table (str) – Name of the table containing column to rename.
- old_name (str) – Current name of the column.
- new_name (str) – New name for the column.
add_not_null
(table, column)Parameters: - table (str) – Name of table containing column.
- column (str) – Name of the column to make not nullable.
drop_not_null
(table, column)Parameters: - table (str) – Name of table containing column.
- column (str) – Name of the column to make nullable.
alter_column_type
(table, column, field[, cast=None])Parameters: - table (str) – Name of the table.
- column_name (str) – Name of the column to modify.
- field (Field) – Field instance representing new data type.
- cast – (postgres-only) specify a cast expression if the data-types are incompatible, e.g.
column_name::int
. Can be provided as either a string or a Cast instance.
Alter the data-type of a column. This method should be used with care, as using incompatible types may not be well-supported by your database.
rename_table
(old_name, new_name)Parameters: - old_name (str) – Current name of the table.
- new_name (str) – New name for the table.
add_index
(table, columns[, unique=False[, using=None]])Parameters: - table (str) – Name of table on which to create the index.
- columns (list) – List of columns which should be indexed.
- unique (bool) – Whether the new index should specify a unique constraint.
- using (str) – Index type (where supported), e.g. GiST or GIN.
drop_index
(table, index_name)Parameters: - table (str) – Name of the table containing the index to be dropped.
- index_name (str) – Name of the index to be dropped.
add_constraint
(table, name, constraint)Parameters: drop_constraint
(table, name)Parameters: - table (str) – Table to drop constraint from.
- name (str) – Name of constraint to drop.
add_unique
(table, \column_names*)Parameters: - table (str) – Table to add constraint to.
- column_names (str) – One or more columns for UNIQUE constraint.
class PostgresqlMigrator
(database)
Generate migrations for Postgresql databases.
set_search_path
(schema_name)Parameters: schema_name (str) – Schema to use. Set the search path (schema) for the subsequent operations.
class SqliteMigrator
(database)
Generate migrations for SQLite databases.
SQLite has limited support for ALTER TABLE
queries, so the following operations are currently not supported for SQLite:
add_constraint
drop_constraint
add_unique
class MySQLMigrator
(database)
Generate migrations for MySQL databases.
Reflection
The reflection module contains helpers for introspecting existing databases. This module is used internally by several other modules in the playhouse, including DataSet and pwiz, a model generator.
generate_models
(database[, schema=None[, \*options*]])
Parameters: |
|
---|---|
Returns: | a |
Generate models for the tables in the given database. For an example of how to use this function, see the section Using Peewee Interactively.
Example:
>>> from peewee import *
>>> from playhouse.reflection import generate_models
>>> db = PostgresqlDatabase('my_app')
>>> models = generate_models(db)
>>> list(models.keys())
['account', 'customer', 'order', 'orderitem', 'product']
>>> globals().update(models) # Inject models into namespace.
>>> for cust in customer.select(): # Query using generated model.
... print(cust.name)
...
Huey Kitty
Mickey Dog
print_model
(model)
Parameters: | model (Model) – model class to print |
---|---|
Returns: | no return value |
Print a user-friendly description of a model class, useful for debugging or interactive use. Currently this prints the table name, and all fields along with their data-types. The Using Peewee Interactively section contains an example.
Example output:
>>> from playhouse.reflection import print_model
>>> print_model(User)
user
id AUTO PK
email TEXT
name TEXT
dob DATE
index(es)
email UNIQUE
>>> print_model(Tweet)
tweet
id AUTO PK
user INT FK: User.id
title TEXT
content TEXT
timestamp DATETIME
is_published BOOL
index(es)
user_id
is_published, timestamp
print_table_sql
(model)
Parameters: | model (Model) – model to print |
---|---|
Returns: | no return value |
Prints the SQL CREATE TABLE
for the given model class, which may be useful for debugging or interactive use. See the Using Peewee Interactively section for example usage. Note that indexes and constraints are not included in the output of this function.
Example output:
>>> from playhouse.reflection import print_table_sql
>>> print_table_sql(User)
CREATE TABLE IF NOT EXISTS "user" (
"id" INTEGER NOT NULL PRIMARY KEY,
"email" TEXT NOT NULL,
"name" TEXT NOT NULL,
"dob" DATE NOT NULL
)
>>> print_table_sql(Tweet)
CREATE TABLE IF NOT EXISTS "tweet" (
"id" INTEGER NOT NULL PRIMARY KEY,
"user_id" INTEGER NOT NULL,
"title" TEXT NOT NULL,
"content" TEXT NOT NULL,
"timestamp" DATETIME NOT NULL,
"is_published" INTEGER NOT NULL,
FOREIGN KEY ("user_id") REFERENCES "user" ("id")
)
class Introspector
(metadata[, schema=None])
Metadata can be extracted from a database by instantiating an Introspector. Rather than instantiating this class directly, it is recommended to use the factory method from_database().
classmethod
from_database
(database[, schema=None])Parameters: - database – a Database instance.
- schema (str) – an optional schema (supported by some databases).
Creates an Introspector instance suitable for use with the given database.
Usage:
db = SqliteDatabase('my_app.db')
introspector = Introspector.from_database(db)
models = introspector.generate_models()
# User and Tweet (assumed to exist in the database) are
# peewee Model classes generated from the database schema.
User = models['user']
Tweet = models['tweet']
generate_models
([skip_invalid=False[, table_names=None[, literal_column_names=False[, bare_fields=False[, include_views=False]]]]])Parameters: - skip_invalid (bool) – Skip tables whose names are invalid python identifiers.
- table_names (list) – List of table names to generate. If unspecified, models are generated for all tables.
- literal_column_names (bool) – Use column-names as-is. By default, column names are “python-ized”, i.e. mixed-case becomes lower-case.
- bare_fields – SQLite-only. Do not specify data-types for introspected columns.
- include_views – generate models for VIEWs as well.
Returns: A dictionary mapping table-names to model classes.
Introspect the database, reading in the tables, columns, and foreign key constraints, then generate a dictionary mapping each database table to a dynamically-generated Model class.
Database URL
This module contains a helper function to generate a database connection from a URL connection string.
connect
(url, \*connect_params*)
Create a Database instance from the given connection URL.
Examples:
- sqlite:///my_database.db will create a SqliteDatabase instance for the file
my_database.db
in the current directory. - sqlite:///:memory: will create an in-memory SqliteDatabase instance.
- postgresql://postgres:my_password@localhost:5432/my_database will create a PostgresqlDatabase instance. A username and password are provided, as well as the host and port to connect to.
- mysql://user:passwd@ip:port/my_db will create a MySQLDatabase instance for the local MySQL database my_db.
- mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 will create a PooledMySQLDatabase instance for the local MySQL database my_db with max_connections set to 20 and a stale_timeout setting of 300 seconds.
Supported schemes:
apsw
: APSWDatabasemysql
: MySQLDatabasemysql+pool
: PooledMySQLDatabasepostgres
: PostgresqlDatabasepostgres+pool
: PooledPostgresqlDatabasepostgresext
: PostgresqlExtDatabasepostgresext+pool
: PooledPostgresqlExtDatabasesqlite
: SqliteDatabasesqliteext
: SqliteExtDatabasesqlite+pool
: PooledSqliteDatabasesqliteext+pool
: PooledSqliteExtDatabase
Usage:
import os
from playhouse.db_url import connect
# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
parse
(url)
Parse the information in the given URL into a dictionary containing database
, host
, port
, user
and/or password
. Additional connection arguments can be passed in the URL query string.
If you are using a custom database class, you can use the parse()
function to extract information from a URL which can then be passed in to your database object.
register_database
(db_class, \names*)
Parameters: |
|
---|
Register additional database class under the specified names. This function can be used to extend the connect()
function to support additional schemes. Suppose you have a custom database class for Firebird
named FirebirdDatabase
.
from playhouse.db_url import connect, register_database
register_database(FirebirdDatabase, 'firebird')
db = connect('firebird://my-firebird-db')
Connection pool
The pool
module contains a number of Database classes that provide connection pooling for PostgreSQL, MySQL and SQLite databases. The pool works by overriding the methods on the Database class that open and close connections to the backend. The pool can specify a timeout after which connections are recycled, as well as an upper bound on the number of open connections.
In a multi-threaded application, up to max_connections will be opened. Each thread (or, if using gevent, greenlet) will have its own connection.
In a single-threaded application, only one connection will be created. It will be continually recycled until either it exceeds the stale timeout or is closed explicitly (using .manual_close()).
By default, all your application needs to do is ensure that connections are closed when you are finished with them, and they will be returned to the pool. For web applications, this typically means that at the beginning of a request, you will open a connection, and when you return a response, you will close the connection.
Simple Postgres pool example code:
# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase
db = PooledPostgresqlExtDatabase(
'my_app',
max_connections=32,
stale_timeout=300, # 5 minutes.
user='postgres')
class BaseModel(Model):
class Meta:
database = db
That’s it! If you would like finer-grained control over the pool of connections, check out the Connection Management section.
Pool APIs
class PooledDatabase
(database[, max_connections=20[, stale_timeout=None[, timeout=None[, \*kwargs*]]]])
Parameters: |
|
---|
Mixin class intended to be used with a subclass of Database.
Note
Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.
Note
If the number of open connections exceeds max_connections, a ValueError will be raised.
manual_close
()Close the currently-open connection without returning it to the pool.
close_idle
()Close all idle connections. This does not include any connections that are currently in-use – only those that were previously created but have since been returned back to the pool.
close_stale
([age=600])Parameters: age (int) – Age at which a connection should be considered stale. Returns: Number of connections closed. Close connections which are in-use but exceed the given age. Use caution when calling this method!
close_all
()Close all connections. This includes any connections that may be in use at the time. Use caution when calling this method!
class PooledPostgresqlDatabase
Subclass of PostgresqlDatabase that mixes in the PooledDatabase helper.
class PooledPostgresqlExtDatabase
Subclass of PostgresqlExtDatabase that mixes in the PooledDatabase helper. The PostgresqlExtDatabase is a part of the Postgresql Extensions module and provides support for many Postgres-specific features.
class PooledMySQLDatabase
Subclass of MySQLDatabase that mixes in the PooledDatabase helper.
class PooledSqliteDatabase
Persistent connections for SQLite apps.
class PooledSqliteExtDatabase
Persistent connections for SQLite apps, using the SQLite Extensions advanced database driver SqliteExtDatabase.
Test Utils
Contains utilities helpful when testing peewee projects.
class count_queries
([only_select=False])
Context manager that will count the number of queries executed within the context.
Parameters: | only_select (bool) – Only count SELECT queries. |
---|
with count_queries() as counter:
huey = User.get(User.username == 'huey')
huey_tweets = [tweet.message for tweet in huey.tweets]
assert counter.count == 2
count
The number of queries executed.
get_queries
()Return a list of 2-tuples consisting of the SQL query and a list of parameters.
assert_query_count
(expected[, only_select=False])
Function or method decorator that will raise an AssertionError
if the number of queries executed in the decorated function does not equal the expected number.
class TestMyApp(unittest.TestCase):
@assert_query_count(1)
def test_get_popular_blogs(self):
popular_blogs = Blog.get_popular()
self.assertEqual(
[blog.title for blog in popular_blogs],
["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])
This function can also be used as a context manager:
class TestMyApp(unittest.TestCase):
def test_expensive_operation(self):
with assert_query_count(1):
perform_expensive_operation()
Flask Utils
The playhouse.flask_utils
module contains several helpers for integrating peewee with the Flask web framework.
Database Wrapper
The FlaskDB
class is a wrapper for configuring and referencing a Peewee database from within a Flask application. Don’t let its name fool you: it is not the same thing as a peewee database. FlaskDB
is designed to remove the following boilerplate from your flask app:
- Dynamically create a Peewee database instance based on app config data.
- Create a base class from which all your application’s models will descend.
- Register hooks at the start and end of a request to handle opening and closing a database connection.
Basic usage:
import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB
DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'
# If we want to exclude particular views from the automatic connection
# management, we list them this way:
FLASKDB_EXCLUDED_ROUTES = ('logout',)
app = Flask(__name__)
app.config.from_object(__name__)
db_wrapper = FlaskDB(app)
class User(db_wrapper.Model):
username = CharField(unique=True)
class Tweet(db_wrapper.Model):
user = ForeignKeyField(User, backref='tweets')
content = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
The above code example will create and instantiate a peewee PostgresqlDatabase specified by the given database URL. Request hooks will be configured to establish a connection when a request is received, and automatically close the connection when the response is sent. Lastly, the FlaskDB
class exposes a FlaskDB.Model
property which can be used as a base for your application’s models.
Here is how you can access the wrapped Peewee database instance that is configured for you by the FlaskDB
wrapper:
# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database
@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
with peewee_db.atomic():
# ...
return jsonify({'transfer-id': xid})
Note
The actual peewee database can be accessed using the FlaskDB.database
attribute.
Here is another way to configure a Peewee database using FlaskDB
:
app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')
While the above examples show using a database URL, for more advanced usages you can specify a dictionary of configuration options, or simply pass in a peewee Database instance:
DATABASE = {
'name': 'my_app_db',
'engine': 'playhouse.pool.PooledPostgresqlDatabase',
'user': 'postgres',
'max_connections': 32,
'stale_timeout': 600,
}
app = Flask(__name__)
app.config.from_object(__name__)
wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database
Using a peewee Database object:
peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)
Database with Application Factory
If you prefer to use the application factory pattern, the FlaskDB
class implements an init_app()
method.
Using as a factory:
db_wrapper = FlaskDB()
# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
username = CharField(unique=True)
def create_app():
app = Flask(__name__)
app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
db_wrapper.init_app(app)
return app
Query utilities
The flask_utils
module provides several helpers for managing queries in your web app. Some common patterns include:
get_object_or_404
(query_or_model, \query*)
Parameters: |
|
---|
Retrieve the object matching the given query, or return a 404 not found response. A common use-case might be a detail page for a weblog. You want to either retrieve the post matching the given URL, or return a 404.
Example:
@app.route('/blog/<slug>/')
def post_detail(slug):
public_posts = Post.select().where(Post.published == True)
post = get_object_or_404(public_posts, (Post.slug == slug))
return render_template('post_detail.html', post=post)
object_list
(template_name, query[, context_variable=’object_list’[, paginate_by=20[, page_var=’page’[, check_bounds=True[, \*kwargs*]]]]])
Parameters: |
|
---|
Retrieve a paginated list of objects specified by the given query. The paginated object list will be dropped into the context using the given context_variable
, as well as metadata about the current page and total number of pages, and finally any arbitrary context data passed as keyword-arguments.
The page is specified using the page
GET
argument, e.g. /my-object-list/?page=3
would return the third page of objects.
Example:
@app.route('/blog/')
def post_index():
public_posts = (Post
.select()
.where(Post.published == True)
.order_by(Post.timestamp.desc()))
return object_list(
'post_index.html',
query=public_posts,
context_variable='post_list',
paginate_by=10)
The template will have the following context:
post_list
, which contains a list of up to 10 posts.page
, which contains the current page based on the value of thepage
GET
parameter.pagination
, a PaginatedQuery instance.
class PaginatedQuery
(query_or_model, paginate_by[, page_var=’page’[, check_bounds=False]])
Parameters: |
|
---|
Helper class to perform pagination based on GET
arguments.
get_page
()Return the currently selected page, as indicated by the value of the
page_var
GET
parameter. If no page is explicitly selected, then this method will return 1, indicating the first page.get_page_count
()Return the total number of possible pages.
get_object_list
()Using the value of get_page(), return the page of objects requested by the user. The return value is a SelectQuery with the appropriate
LIMIT
andOFFSET
clauses.If
check_bounds
was set toTrue
and the requested page contains no objects, then a 404 will be raised.