- Playhouse, extensions to Peewee
Playhouse, extensions to Peewee
Peewee comes with numerous extension modules which are collected under the playhouse
namespace. Despite the silly name, there are some very useful extensions, particularly those that expose vendor-specific database features like the SQLite Extensions and Postgresql Extensions extensions.
Below you will find a loosely organized listing of the various modules that make up the playhouse
.
Database drivers / vendor-specific database functionality
- SQLite Extensions (on its own page)
- SqliteQ
- Sqlite User-Defined Functions
- apsw, an advanced sqlite driver
- Sqlcipher backend
- Postgresql Extensions
- MySQL Extensions
High-level features
Database management and framework integration
SqliteQ
The playhouse.sqliteq
module provides a subclass of SqliteExtDatabase
, that will serialize concurrent writes to a SQLite database. SqliteQueueDatabase
can be used as a drop-in replacement for the regular SqliteDatabase
if you want simple read and write access to a SQLite database from multiple threads.
SQLite only allows one connection to write to the database at any given time. As a result, if you have a multi-threaded application (like a web-server, for example) that needs to write to the database, you may see occasional errors when one or more of the threads attempting to write cannot acquire the lock.
SqliteQueueDatabase
is designed to simplify things by sending all write queries through a single, long-lived connection. The benefit is that you get the appearance of multiple threads writing to the database without conflicts or timeouts. The downside, however, is that you cannot issue write transactions that encompass multiple queries – all writes run in autocommit mode, essentially.
Note
The module gets its name from the fact that all write queries get put into a thread-safe queue. A single worker thread listens to the queue and executes all queries that are sent to it.
Transactions
Because all queries are serialized and executed by a single worker thread, it is possible for transactional SQL from separate threads to be executed out-of-order. In the example below, the transaction started by thread “B” is rolled back by thread “A” (with bad consequences!):
- Thread A: UPDATE transplants SET organ=’liver’, …;
- Thread B: BEGIN TRANSACTION;
- Thread B: UPDATE life_support_system SET timer += 60 …;
- Thread A: ROLLBACK; – Oh no….
Since there is a potential for queries from separate transactions to be interleaved, the transaction()
and atomic()
methods are disabled on SqliteQueueDatabase
.
For cases when you wish to temporarily write to the database from a different thread, you can use the pause()
and unpause()
methods. These methods block the caller until the writer thread is finished with its current workload. The writer then disconnects and the caller takes over until unpause
is called.
The stop()
, start()
, and is_stopped()
methods can also be used to control the writer thread.
Note
Take a look at SQLite’s isolation documentation for more information about how SQLite handles concurrent connections.
Code sample
Creating a database instance does not require any special handling. The SqliteQueueDatabase
accepts some special parameters which you should be aware of, though. If you are using gevent, you must specify use_gevent=True
when instantiating your database – this way Peewee will know to use the appropriate objects for handling queueing, thread creation, and locking.
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase(
'my_app.db',
use_gevent=False, # Use the standard library "threading" module.
autostart=False, # The worker thread now must be started manually.
queue_max_size=64, # Max. # of pending writes that can accumulate.
results_timeout=5.0) # Max. time to wait for query to be executed.
If autostart=False
, as in the above example, you will need to call start()
to bring up the worker threads that will do the actual write query execution.
@app.before_first_request
def _start_worker_threads():
db.start()
If you plan on performing SELECT queries or generally wanting to access the database, you will need to call connect()
and close()
as you would with any other database instance.
When your application is ready to terminate, use the stop()
method to shut down the worker thread. If there was a backlog of work, then this method will block until all pending work is finished (though no new work is allowed).
import atexit
@atexit.register
def _stop_worker_threads():
db.stop()
Lastly, the is_stopped()
method can be used to determine whether the database writer is up and running.
Sqlite User-Defined Functions
The sqlite_udf
playhouse module contains a number of user-defined functions, aggregates, and table-valued functions, which you may find useful. The functions are grouped in collections and you can register these user-defined extensions individually, by collection, or register everything.
Scalar functions are functions which take a number of parameters and return a single value. For example, converting a string to upper-case, or calculating the MD5 hex digest.
Aggregate functions are like scalar functions that operate on multiple rows of data, producing a single result. For example, calculating the sum of a list of integers, or finding the smallest value in a particular column.
Table-valued functions are simply functions that can return multiple rows of data. For example, a regular-expression search function that returns all the matches in a given string, or a function that accepts two dates and generates all the intervening days.
Note
To use table-valued functions, you will need to build the playhouse._sqlite_ext
C extension.
Registering user-defined functions:
db = SqliteDatabase('my_app.db')
# Register *all* functions.
register_all(db)
# Alternatively, you can register individual groups. This will just
# register the DATE and MATH groups of functions.
register_groups(db, 'DATE', 'MATH')
# If you only wish to register, say, the aggregate functions for a
# particular group or groups, you can:
register_aggregate_groups(db, 'DATE')
Using a library function (“hostname”):
# Assume we have a model, Link, that contains lots of arbitrary URLs.
# We want to discover the most common hosts that have been linked.
query = (Link
.select(fn.hostname(Link.url).alias('host'), fn.COUNT(Link.id))
.group_by(fn.hostname(Link.url))
.order_by(fn.COUNT(Link.id).desc())
.tuples())
# Print the hostname along with number of links associated with it.
for host, count in query:
print('%s: %s' % (host, count))
Functions, listed by collection name
Scalar functions are indicated by (f)
, aggregate functions by (a)
, and table-valued functions by (t)
.
CONTROL_FLOW
if_then_else
(cond, truthy[, falsey=None])
Simple ternary-type operator, where, depending on the truthiness of the cond
parameter, either the truthy
or falsey
value will be returned.
DATE
strip_tz
(date_str)
Parameters: | date_str – A datetime, encoded as a string. |
---|---|
Returns: | The datetime with any timezone info stripped off. |
The time is not adjusted in any way, the timezone is simply removed.
humandelta
(nseconds[, glue=’, ‘])
Parameters: |
|
---|---|
Returns: | Easy-to-read description of timedelta. |
Example, 86471 -> “1 day, 1 minute, 11 seconds”
mintdiff
(datetime_value)
Parameters: | datetime_value – A date-time. |
---|---|
Returns: | Minimum difference between any two values in list. |
Aggregate function that computes the minimum difference between any two datetimes.
avgtdiff
(datetime_value)
Parameters: | datetime_value – A date-time. |
---|---|
Returns: | Average difference between values in list. |
Aggregate function that computes the average difference between consecutive values in the list.
duration
(datetime_value)
Parameters: | datetime_value – A date-time. |
---|---|
Returns: | Duration from smallest to largest value in list, in seconds. |
Aggregate function that computes the duration from the smallest to the largest value in the list, returned in seconds.
date_series
(start, stop[, step_seconds=86400])
Parameters: |
|
---|
Table-value function that returns rows consisting of the date/+time values encountered iterating from start to stop, step_seconds
at a time.
Additionally, if start does not have a time component and step_seconds is greater-than-or-equal-to one day (86400 seconds), the values returned will be dates. Conversely, if start does not have a date component, values will be returned as times. Otherwise values are returned as datetimes.
Example:
SELECT * FROM date_series('2017-01-28', '2017-02-02');
value
-----
2017-01-28
2017-01-29
2017-01-30
2017-01-31
2017-02-01
2017-02-02
FILE
file_ext
(filename)
Parameters: | filename (str) – Filename to extract extension from. |
---|---|
Returns: | Returns the file extension, including the leading “.”. |
file_read
(filename)
Parameters: | filename (str) – Filename to read. |
---|---|
Returns: | Contents of the file. |
HELPER
gzip
(data[, compression=9])
Parameters: |
|
---|---|
Returns: | Compressed binary data. |
gunzip
(data)
Parameters: | data (bytes) – Compressed data. |
---|---|
Returns: | Uncompressed binary data. |
hostname
(url)
Parameters: | url (str) – URL to extract hostname from. |
---|---|
Returns: | hostname portion of URL |
toggle
(key)
Parameters: | key – Key to toggle. |
---|
Toggle a key between True/False state. Example:
>>> toggle('my-key')
True
>>> toggle('my-key')
False
>>> toggle('my-key')
True
setting
(key[, value=None])
Parameters: |
|
---|---|
Returns: | Value associated with key. |
Store/retrieve a setting in memory and persist during lifetime of application. To get the current value, only specify the key. To set a new value, call with key and new value.
clear_toggles
()
Clears all state associated with the toggle()
function.
clear_settings
()
Clears all state associated with the setting()
function.
MATH
randomrange
(start[, stop=None[, step=None]])
Parameters: |
|
---|
Return a random integer between [start, end)
.
gauss_distribution
(mean, sigma)
Parameters: |
|
---|
sqrt
(n)
Calculate the square root of n
.
tonumber
(s)
Parameters: | s (str) – String to convert to number. |
---|---|
Returns: | Integer, floating-point or NULL on failure. |
mode
(val)
Parameters: | val – Numbers in list. |
---|---|
Returns: | The mode, or most-common, number observed. |
Aggregate function which calculates mode of values.
minrange
(val)
Parameters: | val – Value |
---|---|
Returns: | Min difference between two values. |
Aggregate function which calculates the minimal distance between two numbers in the sequence.
avgrange
(val)
Parameters: | val – Value |
---|---|
Returns: | Average difference between values. |
Aggregate function which calculates the average distance between two consecutive numbers in the sequence.
range
(val)
Parameters: | val – Value |
---|---|
Returns: | The range from the smallest to largest value in sequence. |
Aggregate function which returns range of values observed.
median
(val)
Parameters: | val – Value |
---|---|
Returns: | The median, or middle, value in a sequence. |
Aggregate function which calculates the middle value in a sequence.
Note
Only available if you compiled the _sqlite_udf
extension.
STRING
substr_count
(haystack, needle)
Returns number of times needle
appears in haystack
.
strip_chars
(haystack, chars)
Strips any characters in chars
from beginning and end of haystack
.
damerau_levenshtein_dist
(s1, s2)
Computes the edit distance from s1 to s2 using the damerau variant of the levenshtein algorithm.
Note
Only available if you compiled the _sqlite_udf
extension.
levenshtein_dist
(s1, s2)
Computes the edit distance from s1 to s2 using the levenshtein algorithm.
Note
Only available if you compiled the _sqlite_udf
extension.
str_dist
(s1, s2)
Computes the edit distance from s1 to s2 using the standard library SequenceMatcher’s algorithm.
Note
Only available if you compiled the _sqlite_udf
extension.
regex_search
(regex, search_string)
Parameters: |
|
---|
Table-value function that searches a string for substrings that match the provided regex
. Returns rows for each match found.
Example:
SELECT * FROM regex_search('\w+', 'extract words, ignore! symbols');
value
-----
extract
words
ignore
symbols
apsw, an advanced sqlite driver
The apsw_ext
module contains a database class suitable for use with the apsw sqlite driver.
APSW Project page: https://github.com/rogerbinns/apsw
APSW is a really neat library that provides a thin wrapper on top of SQLite’s C interface, making it possible to use all of SQLite’s advanced features.
Here are just a few reasons to use APSW, taken from the documentation:
- APSW gives all functionality of SQLite, including virtual tables, virtual file system, blob i/o, backups and file control.
- Connections can be shared across threads without any additional locking.
- Transactions are managed explicitly by your code.
- APSW can handle nested transactions.
- Unicode is handled correctly.
- APSW is faster.
For more information on the differences between apsw and pysqlite, check the apsw docs.
How to use the APSWDatabase
from apsw_ext import *
db = APSWDatabase(':memory:')
class BaseModel(Model):
class Meta:
database = db
class SomeModel(BaseModel):
col1 = CharField()
col2 = DateTimeField()
apsw_ext API notes
APSWDatabase
extends the SqliteExtDatabase
and inherits its advanced features.
class APSWDatabase
(database, \*connect_kwargs*)
Parameters: |
|
---|
register_module
(mod_name, mod_inst)Provides a way of globally registering a module. For more information, see the documentation on virtual tables.
Parameters: - mod_name (string) – name to use for module
- mod_inst (object) – an object implementing the Virtual Table interface
unregister_module
(mod_name)Unregister a module.
Parameters: mod_name (string) – name to use for module
Note
Be sure to use the Field
subclasses defined in the apsw_ext
module, as they will properly handle adapting the data types for storage.
For example, instead of using peewee.DateTimeField
, be sure you are importing and using playhouse.apsw_ext.DateTimeField
.
Sqlcipher backend
- Although this extention’s code is short, it has not been properly peer-reviewed yet and may have introduced vulnerabilities.
- The code contains minimum values for passphrase length and kdf_iter, as well as a default value for the later. Do not regard these numbers as advice. Consult the docs at http://sqlcipher.net/sqlcipher-api/ and security experts.
Also note that this code relies on pysqlcipher and sqlcipher, and the code there might have vulnerabilities as well, but since these are widely used crypto modules, we can expect “short zero days” there.
sqlcipher_ext API notes
class SqlCipherDatabase
(database, passphrase, kdf_iter=64000, \*kwargs*)
Subclass of SqliteDatabase
that stores the database encrypted. Instead of the standard sqlite3
backend, it uses pysqlcipher: a python wrapper for sqlcipher, which – in turn – is an encrypted wrapper around sqlite3
, so the API is identical to SqliteDatabase
’s, except for object construction parameters:
Parameters: |
|
---|
- If the
database
file doesn’t exist, it will be created with encryption by a key derived frompasshprase
withkdf_iter
PBKDF2 iterations. When trying to open an existing database,
passhprase
andkdf_iter
should be identical to the ones used when it was created.rekey
(passphrase)Parameters: passphrase (str) – New passphrase for database. Change the passphrase for database.
Notes:
[Hopefully] there’s no way to tell whether the passphrase is wrong or the file is corrupt. In both cases – the first time we try to acces the database – a
DatabaseError
error is raised, with the exact message:"file is encrypted or is not a database"
.As mentioned above, this only happens when you access the databse, so if you need to know right away whether the passphrase was correct, you can trigger this check by calling [e.g.]
get_tables()
(see example below).Most applications can expect failed attempts to open the database (common case: prompting the user for
passphrase
), so the database can’t be hardwired into theMeta
of model classes. To defer initialization, pass None in to the database.
Example:
db = SqlCipherDatabase(None)
class BaseModel(Model):
"""Parent for all app's models"""
class Meta:
# We won't have a valid db until user enters passhrase.
database = db
# Derive our model subclasses
class Person(BaseModel):
name = TextField(primary_key=True)
right_passphrase = False
while not right_passphrase:
db.init(
'testsqlcipher.db',
passphrase=get_passphrase_from_user())
try: # Actually execute a query against the db to test passphrase.
db.get_tables()
except DatabaseError as exc:
# This error indicates the password was wrong.
if exc.args[0] == 'file is encrypted or is not a database':
tell_user_the_passphrase_was_wrong()
db.init(None) # Reset the db.
else:
raise exc
else:
# The password was correct.
right_passphrase = True
See also: a slightly more elaborate example.
Postgresql Extensions
The postgresql extensions module provides a number of “postgres-only” functions, currently:
- hstore support
- json support, including jsonb for Postgres 9.4.
- server-side cursors
- full-text search
ArrayField
field type, for storing arrays.HStoreField
field type, for storing key/value pairs.IntervalField
field type, for storingtimedelta
objects.JSONField
field type, for storing JSON data.BinaryJSONField
field type for thejsonb
JSON data type.TSVectorField
field type, for storing full-text search data.DateTimeTZ
field type, a timezone-aware datetime field.
In the future I would like to add support for more of postgresql’s features. If there is a particular feature you would like to see added, please open a Github issue.
Warning
In order to start using the features described below, you will need to use the extension PostgresqlExtDatabase
class instead of PostgresqlDatabase
.
The code below will assume you are using the following database and base model:
from playhouse.postgres_ext import *
ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')
class BaseExtModel(Model):
class Meta:
database = ext_db
hstore support
Postgresql hstore is an embedded key/value store. With hstore, you can store arbitrary key/value pairs in your database alongside structured relational data.
To use hstore
, you need to specify an additional parameter when instantiating your PostgresqlExtDatabase
:
# Specify "register_hstore=True":
db = PostgresqlExtDatabase('my_db', register_hstore=True)
Currently the postgres_ext
module supports the following operations:
- Store and retrieve arbitrary dictionaries
- Filter by key(s) or partial dictionary
- Update/add one or more keys to an existing dictionary
- Delete one or more keys from an existing dictionary
- Select keys, values, or zip keys and values
- Retrieve a slice of keys/values
- Test for the existence of a key
- Test that a key has a non-NULL value
Using hstore
To start with, you will need to import the custom database class and the hstore functions from playhouse.postgres_ext
(see above code snippet). Then, it is as simple as adding a HStoreField
to your model:
class House(BaseExtModel):
address = CharField()
features = HStoreField()
You can now store arbitrary key/value pairs on House
instances:
>>> h = House.create(
... address='123 Main St',
... features={'garage': '2 cars', 'bath': '2 bath'})
...
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}
You can filter by individual key, multiple keys or partial dictionary:
>>> query = House.select()
>>> garage = query.where(House.features.contains('garage'))
>>> garage_and_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
Suppose you want to do an atomic update to the house:
>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}
Or, alternatively an atomic delete:
>>> query = House.update(features=House.features.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}
Multiple keys can be deleted at the same time:
>>> query = House.update(features=House.features.delete('garage', 'sqft'))
You can select just keys, just values, or zip the two:
>>> for h in House.select(House.address, House.features.keys().alias('keys')):
... print(h.address, h.keys)
123 Main St [u'bath', u'garage']
>>> for h in House.select(House.address, House.features.values().alias('vals')):
... print(h.address, h.vals)
123 Main St [u'2 bath', u'2 cars']
>>> for h in House.select(House.address, House.features.items().alias('mtx')):
... print(h.address, h.mtx)
123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
You can retrieve a slice of data, for example, all the garage data:
>>> query = House.select(House.address, House.features.slice('garage').alias('garage_data'))
>>> for house in query:
... print(house.address, house.garage_data)
123 Main St {'garage': '2 cars'}
You can check for the existence of a key and filter rows accordingly:
>>> has_garage = House.features.exists('garage')
>>> for house in House.select(House.address, has_garage.alias('has_garage')):
... print(house.address, house.has_garage)
123 Main St True
>>> for house in House.select().where(House.features.exists('garage')):
... print(house.address, house.features['garage']) # <-- just houses w/garage data
123 Main St 2 cars
Interval support
Postgres supports durations through the INTERVAL
data-type (docs).
class IntervalField
([null=False[, …]])
Field class capable of storing Python datetime.timedelta
instances.
Example:
from datetime import timedelta
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('my_db')
class Event(Model):
location = CharField()
duration = IntervalField()
start_time = DateTimeField()
class Meta:
database = db
@classmethod
def get_long_meetings(cls):
return cls.select().where(cls.duration > timedelta(hours=1))
JSON Support
peewee has basic support for Postgres’ native JSON data type, in the form of JSONField
. As of version 2.4.7, peewee also supports the Postgres 9.4 binary json jsonb
type, via BinaryJSONField
.
Warning
Postgres supports a JSON data type natively as of 9.2 (full support in 9.3). In order to use this functionality you must be using the correct version of Postgres with psycopg2 version 2.5 or greater.
To use BinaryJSONField
, which has many performance and querying advantages, you must have Postgres 9.4 or later.
Note
You must be sure your database is an instance of PostgresqlExtDatabase
in order to use the JSONField.
Here is an example of how you might declare a model with a JSON field:
import json
import urllib2
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('my_database')
class APIResponse(Model):
url = CharField()
response = JSONField()
class Meta:
database = db
@classmethod
def request(cls, url):
fh = urllib2.urlopen(url)
return cls.create(url=url, response=json.loads(fh.read()))
APIResponse.create_table()
# Store a JSON response.
offense = APIResponse.request('http://crime-api.com/api/offense/')
booking = APIResponse.request('http://crime-api.com/api/booking/')
# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
APIResponse.response['meta']['model'] == 'offense')
# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
.select(
APIResponse.data['booking']['person'].as_json().alias('person'))
.where(APIResponse.data['meta']['model'] == 'booking'))
for result in q:
print(result.person['name'], result.person['dob'])
The BinaryJSONField
works the same and supports the same operations as the regular JSONField
, but provides several additional operations for testing containment. Using the binary json field, you can test whether your JSON data contains other partial JSON structures (contains()
, contains_any()
, contains_all()
), or whether it is a subset of a larger JSON document (contained_by()
).
For more examples, see the JSONField
and BinaryJSONField
API documents below.
Server-side cursors
When psycopg2 executes a query, normally all results are fetched and returned to the client by the backend. This can cause your application to use a lot of memory when making large queries. Using server-side cursors, results are returned a little at a time (by default 2000 records). For the definitive reference, please see the psycopg2 documentation.
Note
To use server-side (or named) cursors, you must be using PostgresqlExtDatabase
.
To execute a query using a server-side cursor, simply wrap your select query using the ServerSide()
helper:
large_query = PageView.select() # Build query normally.
# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
# do some interesting analysis here.
pass
# Server-side resources are released.
If you would like all SELECT
queries to automatically use a server-side cursor, you can specify this when creating your PostgresqlExtDatabase
:
from postgres_ext import PostgresqlExtDatabase
ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)
Note
Server-side cursors live only as long as the transaction, so for this reason peewee will not automatically call commit()
after executing a SELECT
query. If you do not commit
after you are done iterating, you will not release the server-side resources until the connection is closed (or the transaction is committed later). Furthermore, since peewee will by default cache rows returned by the cursor, you should always call .iterator()
when iterating over a large query.
If you are using the ServerSide()
helper, the transaction and call to iterator()
will be handled transparently.
Full-text search
Postgresql provides sophisticated full-text search using special data-types (tsvector
and tsquery
). Documents should be stored or converted to the tsvector
type, and search queries should be converted to tsquery
.
For simple cases, you can simply use the Match()
function, which will automatically perform the appropriate conversions, and requires no schema changes:
def blog_search(query):
return Blog.select().where(
(Blog.status == Blog.STATUS_PUBLISHED) &
Match(Blog.content, query))
The Match()
function will automatically convert the left-hand operand to a tsvector
, and the right-hand operand to a tsquery
. For better performance, it is recommended you create a GIN
index on the column you plan to search:
CREATE INDEX blog_full_text_search ON blog USING gin(to_tsvector(content));
Alternatively, you can use the TSVectorField
to maintain a dedicated column for storing tsvector
data:
class Blog(Model):
content = TextField()
search_content = TSVectorField()
You will need to explicitly convert the incoming text data to tsvector
when inserting or updating the search_content
field:
content = 'Excellent blog post about peewee ORM.'
blog_entry = Blog.create(
content=content,
search_content=fn.to_tsvector(content))
Note
If you are using the TSVectorField
, it will automatically be created with a GIN index.
postgres_ext API notes
class PostgresqlExtDatabase
(database[, server_side_cursors=False[, register_hstore=False[, …]]])
Identical to PostgresqlDatabase
but required in order to support:
Parameters: |
|
---|
If you wish to use the HStore extension, you must specify register_hstore=True
.
If using server_side_cursors
, also be sure to wrap your queries with ServerSide()
.
ServerSide
(select_query)
Parameters: | select_query – a SelectQuery instance. |
---|---|
Rtype generator: | |
Wrap the given select query in a transaction, and call it’s iterator()
method to avoid caching row instances. In order for the server-side resources to be released, be sure to exhaust the generator (iterate over all the rows).
Usage:
large_query = PageView.select()
for page_view in ServerSide(large_query):
# Do something interesting.
pass
# At this point server side resources are released.
class ArrayField
([field_class=IntegerField[, field_kwargs=None[, dimensions=1[, convert_values=False]]]])
Parameters: |
|
---|
Field capable of storing arrays of the provided field_class.
Note
By default ArrayField will use a GIN index. To disable this, initialize the field with index=False
.
You can store and retrieve lists (or lists-of-lists):
class BlogPost(BaseModel):
content = TextField()
tags = ArrayField(CharField)
post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])
Additionally, you can use the __getitem__
API to query values or slices in the database:
# Get the first tag on a given blog post.
first_tag = (BlogPost
.select(BlogPost.tags[0].alias('first_tag'))
.where(BlogPost.id == 1)
.dicts()
.get())
# first_tag = {'first_tag': 'foo'}
Get a slice of values:
# Get the first two tags.
two_tags = (BlogPost
.select(BlogPost.tags[:2].alias('two'))
.dicts()
.get())
# two_tags = {'two': ['foo', 'bar']}
contains
(\items*)Parameters: items – One or more items that must be in the given array field. # Get all blog posts that are tagged with both "python" and "django".
Blog.select().where(Blog.tags.contains('python', 'django'))
contains_any
(\items*)Parameters: items – One or more items to search for in the given array field. Like
contains()
, except will match rows where the array contains any of the given items.# Get all blog posts that are tagged with "flask" and/or "django".
Blog.select().where(Blog.tags.contains_any('flask', 'django'))
class DateTimeTZField
(\args, **kwargs*)
A timezone-aware subclass of DateTimeField
.
class HStoreField
(\args, **kwargs*)
A field for storing and retrieving arbitrary key/value pairs. For details on usage, see hstore support.
Attention
To use the HStoreField
you will need to be sure the hstore extension is registered with the connection. To accomplish this, instantiate the PostgresqlExtDatabase
with register_hstore=True
.
Note
By default HStoreField
will use a GiST index. To disable this, initialize the field with index=False
.
keys
()Returns the keys for a given row.
>>> for h in House.select(House.address, House.features.keys().alias('keys')):
... print(h.address, h.keys)
123 Main St [u'bath', u'garage']
values
()Return the values for a given row.
>>> for h in House.select(House.address, House.features.values().alias('vals')):
... print(h.address, h.vals)
123 Main St [u'2 bath', u'2 cars']
items
()Like python’s
dict
, return the keys and values in a list-of-lists:>>> for h in House.select(House.address, House.features.items().alias('mtx')):
... print(h.address, h.mtx)
123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
slice
(\args*)Return a slice of data given a list of keys.
>>> for h in House.select(House.address, House.features.slice('garage').alias('garage_data')):
... print(h.address, h.garage_data)
123 Main St {'garage': '2 cars'}
exists
(key)Query for whether the given key exists.
>>> for h in House.select(House.address, House.features.exists('garage').alias('has_garage')):
... print(h.address, h.has_garage)
123 Main St True
>>> for h in House.select().where(House.features.exists('garage')):
... print(h.address, h.features['garage']) # <-- just houses w/garage data
123 Main St 2 cars
defined
(key)Query for whether the given key has a value associated with it.
update
(\*data*)Perform an atomic update to the keys/values for a given row or rows.
>>> query = House.update(features=House.features.update(
... sqft=2000,
... year_built=2012))
>>> query.where(House.id == 1).execute()
delete
(\keys*)Delete the provided keys for a given row or rows.
Note
We will use an
UPDATE
query.>>> query = House.update(features=House.features.delete(
... 'sqft', 'year_built'))
>>> query.where(House.id == 1).execute()
contains
(value)Parameters: value – Either a dict
, alist
of keys, or a single key.Query rows for the existence of either:
- a partial dictionary.
- a list of keys.
- a single key.
>>> query = House.select()
>>> has_garage = query.where(House.features.contains('garage'))
>>> garage_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
contains_any
(\keys*)Parameters: keys – One or more keys to search for. Query rows for the existince of any key.
class JSONField
(dumps=None, \args, **kwargs*)
Parameters: | dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper. |
---|
Field class suitable for storing and querying arbitrary JSON. When using this on a model, set the field’s value to a Python object (either a dict
or a list
). When you retrieve your value from the database it will be returned as a Python data structure.
Note
You must be using Postgres 9.2 / psycopg2 2.5 or greater.
Note
If you are using Postgres 9.4, strongly consider using the BinaryJSONField
instead as it offers better performance and more powerful querying options.
Example model declaration:
db = PostgresqlExtDatabase('my_db')
class APIResponse(Model):
url = CharField()
response = JSONField()
class Meta:
database = db
Example of storing JSON data:
url = 'http://foo.com/api/resource/'
resp = json.loads(urllib2.urlopen(url).read())
APIResponse.create(url=url, response=resp)
APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})
To query, use Python’s []
operators to specify nested key or array lookups:
APIResponse.select().where(
APIResponse.response['key1']['nested-key'] == 'some-value')
To illustrate the use of the []
operators, imagine we have the following data stored in an APIResponse
:
{
"foo": {
"bar": ["i1", "i2", "i3"],
"baz": {
"huey": "mickey",
"peewee": "nugget"
}
}
}
Here are the results of a few queries:
def get_data(expression):
# Helper function to just retrieve the results of a
# particular expression.
query = (APIResponse
.select(expression.alias('my_data'))
.dicts()
.get())
return query['my_data']
# Accessing the foo -> bar subkey will return a JSON
# representation of the list.
get_data(APIResponse.data['foo']['bar'])
# '["i1", "i2", "i3"]'
# In order to retrieve this list as a Python list,
# we will call .as_json() on the expression.
get_data(APIResponse.data['foo']['bar'].as_json())
# ['i1', 'i2', 'i3']
# Similarly, accessing the foo -> baz subkey will
# return a JSON representation of the dictionary.
get_data(APIResponse.data['foo']['baz'])
# '{"huey": "mickey", "peewee": "nugget"}'
# Again, calling .as_json() will return an actual
# python dictionary.
get_data(APIResponse.data['foo']['baz'].as_json())
# {'huey': 'mickey', 'peewee': 'nugget'}
# When dealing with simple values, either way works as
# you expect.
get_data(APIResponse.data['foo']['bar'][0])
# 'i1'
# Calling .as_json() when the result is a simple value
# will return the same thing as the previous example.
get_data(APIResponse.data['foo']['bar'][0].as_json())
# 'i1'
class BinaryJSONField
(dumps=None, \args, **kwargs*)
Parameters: | dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper. |
---|
Store and query arbitrary JSON documents. Data should be stored using normal Python dict
and list
objects, and when data is returned from the database, it will be returned using dict
and list
as well.
For examples of basic query operations, see the above code samples for JSONField
. The example queries below will use the same APIResponse
model described above.
Note
By default BinaryJSONField will use a GiST index. To disable this, initialize the field with index=False
.
Note
You must be using Postgres 9.4 / psycopg2 2.5 or newer. If you are using Postgres 9.2 or 9.3, you can use the regular JSONField
instead.
contains
(other)Test whether the given JSON data contains the given JSON fragment or key.
Example:
search_fragment = {
'foo': {'bar': ['i2']}
}
query = (APIResponse
.select()
.where(APIResponse.data.contains(search_fragment)))
# If we're searching for a list, the list items do not need to
# be ordered in a particular way:
query = (APIResponse
.select()
.where(APIResponse.data.contains({
'foo': {'bar': ['i2', 'i1']}})))
We can pass in simple keys as well. To find APIResponses that contain the key
foo
at the top-level:APIResponse.select().where(APIResponse.data.contains('foo'))
We can also search sub-keys using square-brackets:
APIResponse.select().where(
APIResponse.data['foo']['bar'].contains(['i2', 'i1']))
contains_any
(\items*)Search for the presence of one or more of the given items.
APIResponse.select().where(
APIResponse.data.contains_any('foo', 'baz', 'nugget'))
Like
contains()
, we can also search sub-keys:APIResponse.select().where(
APIResponse.data['foo']['bar'].contains_any('i2', 'ix'))
contains_all
(\items*)Search for the presence of all of the given items.
APIResponse.select().where(
APIResponse.data.contains_all('foo'))
Like
contains_any()
, we can also search sub-keys:APIResponse.select().where(
APIResponse.data['foo']['bar'].contains_all('i1', 'i2', 'i3'))
contained_by
(other)Test whether the given JSON document is contained by (is a subset of) the given JSON document. This method is the inverse of
contains()
.big_doc = {
'foo': {
'bar': ['i1', 'i2', 'i3'],
'baz': {
'huey': 'mickey',
'peewee': 'nugget',
}
},
'other_key': ['nugget', 'bear', 'kitten'],
}
APIResponse.select().where(
APIResponse.data.contained_by(big_doc))
Match
(field, query)
Generate a full-text search expression, automatically converting the left-hand operand to a tsvector
, and the right-hand operand to a tsquery
.
Example:
def blog_search(query):
return Blog.select().where(
(Blog.status == Blog.STATUS_PUBLISHED) &
Match(Blog.content, query))
class TSVectorField
Field type suitable for storing tsvector
data. This field will automatically be created with a GIN
index for improved search performance.
Note
Data stored in this field will still need to be manually converted to the tsvector
type.
Note
By default TSVectorField will use a GIN index. To disable this, initialize the field with
index=False
.
Example usage:
class Blog(Model):
content = TextField()
search_content = TSVectorField()
content = 'this is a sample blog entry.'
blog_entry = Blog.create(
content=content,
search_content=fn.to_tsvector(content)) # Note `to_tsvector()`.
MySQL Extensions
Peewee provides an alternate database implementation for using the mysql-connector driver. The implementation can be found in playhouse.mysql_ext
.
Example usage:
from playhouse.mysql_ext import MySQLConnectorDatabase
# MySQL database implementation that utilizes mysql-connector driver.
db = MySQLConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
DataSet
The dataset module contains a high-level API for working with databases modeled after the popular project of the same name. The aims of the dataset module are to provide:
- A simplified API for working with relational data, along the lines of working with JSON.
- An easy way to export relational data as JSON or CSV.
- An easy way to import JSON or CSV data into a relational database.
A minimal data-loading script might look like this:
from playhouse.dataset import DataSet
db = DataSet('sqlite:///:memory:')
table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')
huey = table.find_one(name='Huey')
print huey
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
for obj in table:
print obj
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}
You can export or import data using freeze()
and thaw()
:
# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')
# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')
Getting started
DataSet
objects are initialized by passing in a database URL of the format dialect://user:password@host/dbname
. See the Database URL section for examples of connecting to various databases.
# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')
Storing data
To store data, we must first obtain a reference to a table. If the table does not exist, it will be created automatically:
# Get a table reference, creating the table if it does not exist.
table = db['users']
We can now insert()
new rows into the table. If the columns do not exist, they will be created automatically:
table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')
To update existing entries in the table, pass in a dictionary containing the new values and filter conditions. The list of columns to use as filters is specified in the columns argument. If no filter columns are specified, then all rows will be updated.
# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])
# Update all records. If the column does not exist, it will be created.
table.update(favorite_orm='peewee')
Importing data
To import data from an external source, such as a JSON or CSV file, you can use the thaw()
method. By default, new columns will be created for any attributes encountered. If you wish to only populate columns that are already defined on a table, you can pass in strict=True
.
# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')
table.all()[:3]
# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
{'id': 2, 'ticker': 'AAPL', 'price': 109},
{'id': 3, 'ticker': 'AMZN', 'price': 300}]
Using transactions
DataSet supports nesting transactions using a simple context manager.
table = db['users']
with db.transaction() as txn:
table.insert(name='Charlie')
with db.transaction() as nested_txn:
# Set Charlie's favorite ORM to Django.
table.update(name='Charlie', favorite_orm='django', columns=['name'])
# jk/lol
nested_txn.rollback()
Inspecting the database
You can use the tables()
method to list the tables in the current database:
>>> print db.tables
['sometable', 'user']
And for a given table, you can print the columns:
>>> table = db['user']
>>> print table.columns
['id', 'age', 'name', 'gender', 'favorite_orm']
We can also find out how many rows are in a table:
>>> print len(db['user'])
3
Reading data
To retrieve all rows, you can use the all()
method:
# Retrieve all the users.
users = db['user'].all()
# We can iterate over all rows without calling `.all()`
for user in db['user']:
print user['name']
Specific objects can be retrieved using find()
and find_one()
.
# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')
# Find Huey.
huey = db['user'].find_one(name='Huey')
Exporting data
To export data, use the freeze()
method, passing in the query you wish to export:
peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')
API
class DataSet
(url)
Parameters: | url (str) – A database URL. See Database URL for examples. |
---|
The DataSet class provides a high-level API for working with relational databases.
tables
Return a list of tables stored in the database. This list is computed dynamically each time it is accessed.
__getitem__
(table_name)Provide a
Table
reference to the specified table. If the table does not exist, it will be created.query
(sql[, params=None[, commit=True]])Parameters: - sql (str) – A SQL query.
- params (list) – Optional parameters for the query.
- commit (bool) – Whether the query should be committed upon execution.
Returns: A database cursor.
Execute the provided query against the database.
transaction
()Create a context manager representing a new transaction (or savepoint).
freeze
(query[, format=’csv’[, filename=None[, file_obj=None[, \*kwargs*]]]])Parameters: - query – A
SelectQuery
, generated usingall()
or ~Table.find. - format – Output format. By default, csv and json are supported.
- filename – Filename to write output to.
- file_obj – File-like object to write output to.
- kwargs – Arbitrary parameters for export-specific functionality.
- query – A
thaw
(table[, format=’csv’[, filename=None[, file_obj=None[, strict=False[, \*kwargs*]]]]])Parameters: - table (str) – The name of the table to load data into.
- format – Input format. By default, csv and json are supported.
- filename – Filename to read data from.
- file_obj – File-like object to read data from.
- strict (bool) – Whether to store values for columns that do not already exist on the table.
- kwargs – Arbitrary parameters for import-specific functionality.
connect
()Open a connection to the underlying database. If a connection is not opened explicitly, one will be opened the first time a query is executed.
close
()Close the connection to the underlying database.
class Table
(dataset, name, model_class)
Provides a high-level API for working with rows in a given table.
columns
Return a list of columns in the given table.
model_class
A dynamically-created
Model
class.create_index
(columns[, unique=False])Create an index on the given columns:
# Create a unique index on the `username` column.
db['users'].create_index(['username'], unique=True)
insert
(\*data*)Insert the given data dictionary into the table, creating new columns as needed.
update
(columns=None, conjunction=None, \*data*)Update the table using the provided data. If one or more columns are specified in the columns parameter, then those columns’ values in the data dictionary will be used to determine which rows to update.
# Update all rows.
db['users'].update(favorite_orm='peewee')
# Only update Huey's record, setting his age to 3.
db['users'].update(name='Huey', age=3, columns=['name'])
find
(\*query*)Query the table for rows matching the specified equality conditions. If no query is specified, then all rows are returned.
peewee_users = db['users'].find(favorite_orm='peewee')
find_one
(\*query*)Return a single row matching the specified equality conditions. If no matching row is found then
None
will be returned.huey = db['users'].find_one(name='Huey')
all
()Return all rows in the given table.
delete
(\*query*)Delete all rows matching the given equality conditions. If no query is provided, then all rows will be deleted.
# Adios, Django!
db['users'].delete(favorite_orm='Django')
# Delete all the secret messages.
db['secret_messages'].delete()
freeze
([format=’csv’[, filename=None[, file_obj=None[, \*kwargs*]]]])Parameters: - format – Output format. By default, csv and json are supported.
- filename – Filename to write output to.
- file_obj – File-like object to write output to.
- kwargs – Arbitrary parameters for export-specific functionality.
thaw
([format=’csv’[, filename=None[, file_obj=None[, strict=False[, \*kwargs*]]]]])Parameters: - format – Input format. By default, csv and json are supported.
- filename – Filename to read data from.
- file_obj – File-like object to read data from.
- strict (bool) – Whether to store values for columns that do not already exist on the table.
- kwargs – Arbitrary parameters for import-specific functionality.
Fields
These fields can be found in the playhouse.fields
module.
class CompressedField
([compression_level=6[, algorithm=’zlib’[, \*kwargs*]]])
Parameters: |
|
---|
Stores compressed data using the specified algorithm. This field extends BlobField
, transparently storing a compressed representation of the data in the database.
class PickleField
Stores arbitrary Python data by transparently pickling and un-pickling data stored in the field. This field extends BlobField
. If the cPickle
module is available, it will be used.
Hybrid Attributes
Hybrid attributes encapsulate functionality that operates at both the Python and SQL levels. The idea for hybrid attributes comes from a feature of the same name in SQLAlchemy. Consider the following example:
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_method
def contains(self, point):
return (self.start <= point) & (point < self.end)
The hybrid attribute gets its name from the fact that the length
attribute will behave differently depending on whether it is accessed via the Interval
class or an Interval
instance.
If accessed via an instance, then it behaves just as you would expect.
If accessed via the Interval.length
class attribute, however, the length calculation will be expressed as a SQL expression. For example:
query = Interval.select().where(Interval.length > 5)
This query will be equivalent to the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)
The playhouse.hybrid
module also contains a decorator for implementing hybrid methods which can accept parameters. As with hybrid properties, when accessed via a model instance, then the function executes normally as-written. When the hybrid method is called on the class, however, it will generate a SQL expression.
Example:
query = Interval.select().where(Interval.contains(2))
This query is equivalent to the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
There is an additional API for situations where the python implementation differs slightly from the SQL implementation. Let’s add a radius
method to the Interval
model. Because this method calculates an absolute value, we will use the Python abs()
function for the instance portion and the fn.ABS()
SQL function for the class portion.
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_property
def radius(self):
return abs(self.length) / 2
@radius.expression
def radius(cls):
return fn.ABS(cls.length) / 2
What is neat is that both the radius
implementations refer to the length
hybrid attribute! When accessed via an Interval
instance, the radius calculation will be executed in Python. When invoked via an Interval
class, we will get the appropriate SQL.
Example:
query = Interval.select().where(Interval.radius < 3)
This query is equivalent to the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)
Pretty neat, right? Thanks for the cool idea, SQLAlchemy!
Hybrid API
class hybrid_method
(func[, expr=None])
Method decorator that allows the definition of a Python object method with both instance-level and class-level behavior.
Example:
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_method
def contains(self, point):
return (self.start <= point) & (point < self.end)
When called with an Interval
instance, the contains
method will behave as you would expect. When called as a classmethod, though, a SQL expression will be generated:
query = Interval.select().where(Interval.contains(2))
Would generate the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
expression
(expr)Method decorator for specifying the SQL-expression producing method.
class hybrid_property
(fget[, fset=None[, fdel=None[, expr=None]]])
Method decorator that allows the definition of a Python object property with both instance-level and class-level behavior.
Examples:
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_property
def radius(self):
return abs(self.length) / 2
@radius.expression
def radius(cls):
return fn.ABS(cls.length) / 2
When accessed on an Interval
instance, the length
and radius
properties will behave as you would expect. When accessed as class attributes, though, a SQL expression will be generated instead:
query = (Interval
.select()
.where(
(Interval.length > 6) &
(Interval.radius >= 3)))
Would generate the following SQL:
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (
(("t1"."end" - "t1"."start") > 6) AND
((abs("t1"."end" - "t1"."start") / 2) >= 3)
)
Key/Value Store
The playhouse.kv
module contains the implementation of a persistent dictionary.
class KeyValue
([key_field=None[, value_field=None[, ordered=False[, database=None[, table_name=’keyvalue’]]]]])
Parameters: |
|
---|
Dictionary-like API for storing key/value data. Like dictionaries, supports the expected APIs, but also has the added capability of accepting expressions for getting, setting and deleting items.
Table is created automatically (if it doesn’t exist) when the KeyValue
is instantiated.
Basic examples:
# Create a key/value store, which uses an in-memory SQLite database
# for data storage.
KV = KeyValue()
# Set (or overwrite) the value for "k1".
KV['k1'] = 'v1'
# Set (or update) multiple keys at once.
KV.update(k2='v2', k3='v3')
# Getting values works as you'd expect.
assert KV['k2'] == 'v2'
# We can also do this:
for value in KV[KV.key > 'k1']:
print(value)
# 'v2'
# 'v3'
# Update multiple values at once using expression:
KV[KV.key > 'k1'] = 'vx'
# What's stored in the KV?
print(dict(KV))
# {'k1': 'v1', 'k2': 'vx', 'k3': 'vx'}
# Delete a single item.
del KV['k2']
# How many items are stored in the KV?
print(len(KV))
# 2
# Delete items that match the given condition.
del KV[KV.key > 'k1']
__contains__
(expr)Parameters: expr – a single key or an expression Returns: Boolean whether key/expression exists. Example:
>>> kv = KeyValue()
>>> kv.update(k1='v1', k2='v2')
>>> 'k1' in kv
True
>>> 'kx' in kv
False
>>> (KV.key < 'k2') in KV
True
>>> (KV.key > 'k2') in KV
False
__len__
()Returns: Count of items stored. __getitem__
(expr)Parameters: expr – a single key or an expression. Returns: value(s) corresponding to key/expression. Raises: KeyError
if single key given and not found.Examples:
>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')
>>> KV['k1']
'v1'
>>> KV['kx']
KeyError: "kx" not found
>>> KV[KV.key > 'k1']
['v2', 'v3']
>>> KV[KV.key < 'k1']
[]
__setitem__
(expr, value)Parameters: - expr – a single key or an expression.
- value – value to set for key(s)
Set value for the given key. If
expr
is an expression, then any keys matching the expression will have their value updated.Example:
>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')
>>> KV['k1'] = 'v1-x'
>>> print(KV['k1'])
'v1-x'
>>> KV[KV.key >= 'k2'] = 'v99'
>>> dict(KV)
{'k1': 'v1-x', 'k2': 'v99', 'k3': 'v99'}
__delitem__
(expr)Parameters: expr – a single key or an expression. Delete the given key. If an expression is given, delete all keys that match the expression.
Example:
>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2, k3=3)
>>> del KV['k1'] # Deletes "k1".
>>> del KV['k1']
KeyError: "k1" does not exist
>>> del KV[KV.key > 'k2'] # Deletes "k3".
>>> del KV[KV.key > 'k99'] # Nothing deleted, no keys match.
keys
()Returns: an iterable of all keys in the table. values
()Returns: an iterable of all values in the table. items
()Returns: an iterable of all key/value pairs in the table. update
([__data=None[, \*mapping*]])Efficiently bulk-insert or replace the given key/value pairs.
Example:
>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2) # Sets 'k1'=1, 'k2'=2.
>>> dict(KV)
{'k1': 1, 'k2': 2}
>>> KV.update(k2=22, k3=3) # Updates 'k2'->22, sets 'k3'=3.
>>> dict(KV)
{'k1': 1, 'k2': 22, 'k3': 3}
>>> KV.update({'k2': -2, 'k4': 4}) # Also can pass a dictionary.
>>> dict(KV)
{'k1': 1, 'k2': -2, 'k3': 3, 'k4': 4}
Attention
Because Postgresql does not support INSERT + REPLACE, the
KeyValue.update()
method is not supported for Postgresql databases (as it cannot be implemented efficiently).get
(expr[, default=None])Parameters: - expr – a single key or an expression.
- default – default value if key not found.
Returns: value of given key/expr or default if single key not found.
Get the value at the given key. If the key does not exist, the default value is returned, unless the key is an expression in which case an empty list will be returned.
pop
(expr[, default=Sentinel])Parameters: - expr – a single key or an expression.
- default – default value if key does not exist.
Returns: value of given key/expr or default if single key not found.
Get value and delete the given key. If the key does not exist, the default value is returned, unless the key is an expression in which case an empty list is returned.
clear
()Remove all items from the key-value table.
Shortcuts
This module contains helper functions for expressing things that would otherwise be somewhat verbose or cumbersome using peewee’s APIs. There are also helpers for serializing models to dictionaries and vice-versa.
model_to_dict
(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None[, max_depth=None[, manytomany=False]]]]]]]])
Parameters: |
|
---|
Convert a model instance (and optionally any related instances) to a dictionary.
Examples:
>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}
>>> model_to_dict(user, backrefs=True)
{'id': 1, 'tweets': [], 'username': 'charlie'}
>>> t1 = Tweet.create(user=user, message='tweet-1')
>>> t2 = Tweet.create(user=user, message='tweet-2')
>>> model_to_dict(user, backrefs=True)
{
'id': 1,
'tweets': [
{'id': 1, 'message': 'tweet-1'},
{'id': 2, 'message': 'tweet-2'},
],
'username': 'charlie'
}
>>> model_to_dict(t1)
{
'id': 1,
'message': 'tweet-1',
'user': {
'id': 1,
'username': 'charlie'
}
}
>>> model_to_dict(t2, recurse=False)
{'id': 1, 'message': 'tweet-2', 'user': 1}
dict_to_model
(model_class, data[, ignore_unknown=False])
Parameters: |
|
---|
Convert a dictionary of data to a model instance, creating related instances where appropriate.
Examples:
>>> user_data = {'id': 1, 'username': 'charlie'}
>>> user = dict_to_model(User, user_data)
>>> user
<__main__.User at 0x7fea8fa4d490>
>>> user.username
'charlie'
>>> note_data = {'id': 2, 'text': 'note text', 'user': user_data}
>>> note = dict_to_model(Note, note_data)
>>> note.text
'note text'
>>> note.user.username
'charlie'
>>> user_with_notes = {
... 'id': 1,
... 'username': 'charlie',
... 'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]}
>>> user = dict_to_model(User, user_with_notes)
>>> user.notes[0].text
'note-1'
>>> user.notes[0].user.username
'charlie'
update_model_from_dict
(instance, data[, ignore_unknown=False])
Parameters: |
|
---|
Update a model instance with the given data dictionary.
Signal support
Models with hooks for signals (a-la django) are provided in playhouse.signals
. To use the signals, you will need all of your project’s models to be a subclass of playhouse.signals.Model
, which overrides the necessary methods to provide support for the various signals.
from playhouse.signals import Model, post_save
class MyModel(Model):
data = IntegerField()
@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
put_data_in_cache(instance.data)
Warning
For what I hope are obvious reasons, Peewee signals do not work when you use the Model.insert()
, Model.update()
, or Model.delete()
methods. These methods generate queries that execute beyond the scope of the ORM, and the ORM does not know about which model instances might or might not be affected when the query executes.
Signals work by hooking into the higher-level peewee APIs like Model.save()
and Model.delete_instance()
, where the affected model instance is known ahead of time.
The following signals are provided:
pre_save
Called immediately before an object is saved to the database. Provides an additional keyword argument created
, indicating whether the model is being saved for the first time or updated.
post_save
Called immediately after an object is saved to the database. Provides an additional keyword argument created
, indicating whether the model is being saved for the first time or updated.
pre_delete
Called immediately before an object is deleted from the database when Model.delete_instance()
is used.
post_delete
Called immediately after an object is deleted from the database when Model.delete_instance()
is used.
pre_init
Called when a model class is first instantiated
Connecting handlers
Whenever a signal is dispatched, it will call any handlers that have been registered. This allows totally separate code to respond to events like model save and delete.
The Signal
class provides a connect()
method, which takes a callback function and two optional parameters for “sender” and “name”. If specified, the “sender” parameter should be a single model class and allows your callback to only receive signals from that one model class. The “name” parameter is used as a convenient alias in the event you wish to unregister your signal handler.
Example usage:
from playhouse.signals import *
def post_save_handler(sender, instance, created):
print '%s was just saved' % instance
# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)
All signal handlers accept as their first two arguments sender
and instance
, where sender
is the model class and instance
is the actual model being acted upon.
If you’d like, you can also use a decorator to connect signal handlers. This is functionally equivalent to the above example:
@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
print '%s was just saved' % instance
Signal API
class Signal
Stores a list of receivers (callbacks) and calls them when the “send” method is invoked.
connect
(receiver[, sender=None[, name=None]])Parameters: - receiver (callable) – a callable that takes at least two parameters, a “sender”, which is the Model subclass that triggered the signal, and an “instance”, which is the actual model instance.
- sender (Model) – if specified, only instances of this model class will trigger the receiver callback.
- name (string) – a short alias
Add the receiver to the internal list of receivers, which will be called whenever the signal is sent.
from playhouse.signals import post_save
from project.handlers import cache_buster
post_save.connect(cache_buster, name='project.cache_buster')
disconnect
([receiver=None[, name=None]])Parameters: - receiver (callable) – the callback to disconnect
- name (string) – a short alias
Disconnect the given receiver (or the receiver with the given name alias) so that it no longer is called. Either the receiver or the name must be provided.
post_save.disconnect(name='project.cache_buster')
send
(instance, \args, **kwargs*)Parameters: instance – a model instance Iterates over the receivers and will call them in the order in which they were connected. If the receiver specified a sender, it will only be called if the instance is an instance of the sender.
pwiz, a model generator
pwiz
is a little script that ships with peewee and is capable of introspecting an existing database and generating model code suitable for interacting with the underlying data. If you have a database already, pwiz can give you a nice boost by generating skeleton code with correct column affinities and foreign keys.
If you install peewee using setup.py install
, pwiz will be installed as a “script” and you can just run:
python -m pwiz -e postgresql -u postgres my_postgres_db
This will print a bunch of models to standard output. So you can do this:
python -m pwiz -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print [blog.name for blog in Blog.select()]
Option | Meaning | Example |
---|---|---|
-h | show help | |
-e | database backend | -e mysql |
-H | host to connect to | -H remote.db.server |
-p | port to connect on | -p 9001 |
-u | database user | -u postgres |
-P | database password | -P secret |
-s | postgres schema | -s public |
The following are valid parameters for the engine:
- sqlite
- mysql
- postgresql
Schema Migrations
Peewee now supports schema migrations, with well-tested support for Postgresql, SQLite and MySQL. Unlike other schema migration tools, peewee’s migrations do not handle introspection and database “versioning”. Rather, peewee provides a number of helper functions for generating and running schema-altering statements. This engine provides the basis on which a more sophisticated tool could some day be built.
Migrations can be written as simple python scripts and executed from the command-line. Since the migrations only depend on your applications Database
object, it should be easy to manage changing your model definitions and maintaining a set of migration scripts without introducing dependencies.
Example usage
Begin by importing the helpers from the migrate module:
from playhouse.migrate import *
Instantiate a migrator
. The SchemaMigrator
class is responsible for generating schema altering operations, which can then be run sequentially by the migrate()
helper.
# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)
# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)
Use migrate()
to execute one or more operations:
title_field = CharField(default='')
status_field = IntegerField(null=True)
migrate(
migrator.add_column('some_table', 'title', title_field),
migrator.add_column('some_table', 'status', status_field),
migrator.drop_column('some_table', 'old_column'),
)
Warning
Migrations are not run inside a transaction. If you wish the migration to run in a transaction you will need to wrap the call to migrate in a transaction block, e.g.
with my_db.transaction():
migrate(...)
Supported Operations
Add new field(s) to an existing model:
# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')
# Run the migration, specifying the database table, field name and field.
migrate(
migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
migrator.add_column('comment_tbl', 'comment', comment_field),
)
Renaming a field:
# Specify the table, original name of the column, and its new name.
migrate(
migrator.rename_column('story', 'pub_date', 'publish_date'),
migrator.rename_column('story', 'mod_date', 'modified_date'),
)
Dropping a field:
migrate(
migrator.drop_column('story', 'some_old_field'),
)
Making a field nullable or not nullable:
# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
# Make `pub_date` allow NULL values.
migrator.drop_not_null('story', 'pub_date'),
# Prevent `modified_date` from containing NULL values.
migrator.add_not_null('story', 'modified_date'),
)
Renaming a table:
migrate(
migrator.rename_table('story', 'stories_tbl'),
)
Adding an index:
# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
# Create an index on the `pub_date` column.
migrator.add_index('story', ('pub_date',), False),
# Create a multi-column index on the `pub_date` and `status` fields.
migrator.add_index('story', ('pub_date', 'status'), False),
# Create a unique index on the category and title fields.
migrator.add_index('story', ('category_id', 'title'), True),
)
Dropping an index:
# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))
Migrations API
migrate
(\operations*)
Execute one or more schema altering operations.
Usage:
migrate(
migrator.add_column('some_table', 'new_column', CharField(default='')),
migrator.create_index('some_table', ('new_column',)),
)
class SchemaMigrator
(database)
Parameters: | database – a Database instance. |
---|
The SchemaMigrator
is responsible for generating schema-altering statements.
add_column
(table, column_name, field)Parameters: Add a new column to the provided table. The
field
provided will be used to generate the appropriate column definition.Note
If the field is not nullable it must specify a default value.
Note
For non-null fields, the field will initially be added as a null field, then an
UPDATE
statement will be executed to populate the column with the default value. Finally, the column will be marked as not null.drop_column
(table, column_name[, cascade=True])Parameters: - table (str) – Name of the table to drop column from.
- column_name (str) – Name of the column to drop.
- cascade (bool) – Whether the column should be dropped with CASCADE.
rename_column
(table, old_name, new_name)Parameters: - table (str) – Name of the table containing column to rename.
- old_name (str) – Current name of the column.
- new_name (str) – New name for the column.
add_not_null
(table, column)Parameters: - table (str) – Name of table containing column.
- column (str) – Name of the column to make not nullable.
drop_not_null
(table, column)Parameters: - table (str) – Name of table containing column.
- column (str) – Name of the column to make nullable.
rename_table
(old_name, new_name)Parameters: - old_name (str) – Current name of the table.
- new_name (str) – New name for the table.
add_index
(table, columns[, unique=False])Parameters: - table (str) – Name of table on which to create the index.
- columns (list) – List of columns which should be indexed.
- unique (bool) – Whether the new index should specify a unique constraint.
drop_index
(table, index_name):param str table Name of the table containing the index to be dropped. :param str index_name: Name of the index to be dropped.
class PostgresqlMigrator
(database)
Generate migrations for Postgresql databases.
set_search_path
(schema_name)Parameters: schema_name (str) – Schema to use. Set the search path (schema) for the subsequent operations.
class SqliteMigrator
(database)
Generate migrations for SQLite databases.
class MySQLMigrator
(database)
Generate migrations for MySQL databases.
Reflection
The reflection module contains helpers for introspecting existing databases. This module is used internally by several other modules in the playhouse, including DataSet and pwiz, a model generator.
class Introspector
(metadata[, schema=None])
Metadata can be extracted from a database by instantiating an Introspector
. Rather than instantiating this class directly, it is recommended to use the factory method from_database()
.
classmethod
from_database
(database[, schema=None])Parameters: - database – a
Database
instance. - schema (str) – an optional schema (supported by some databases).
Creates an
Introspector
instance suitable for use with the given database.Usage:
db = SqliteDatabase('my_app.db')
introspector = Introspector.from_database(db)
models = introspector.generate_models()
# User and Tweet (assumed to exist in the database) are
# peewee Model classes generated from the database schema.
User = models['user']
Tweet = models['tweet']
- database – a
generate_models
([skip_invalid=False[, table_names=None[, literal_column_names=False[, bare_fields=False]]]])Parameters: - skip_invalid (bool) – Skip tables whose names are invalid python identifiers.
- table_names (list) – List of table names to generate. If unspecified, models are generated for all tables.
- literal_column_names (bool) – Use column-names as-is. By default, column names are “python-ized”, i.e. mixed-case becomes lower-case.
- bare_fields – SQLite-only. Do not specify data-types for introspected columns.
Returns: A dictionary mapping table-names to model classes.
Introspect the database, reading in the tables, columns, and foreign key constraints, then generate a dictionary mapping each database table to a dynamically-generated
Model
class.
Database URL
This module contains a helper function to generate a database connection from a URL connection string.
connect
(url, \*connect_params*)
Create a Database
instance from the given connection URL.
Examples:
- sqlite:///my_database.db will create a
SqliteDatabase
instance for the filemy_database.db
in the current directory. - sqlite:///:memory: will create an in-memory
SqliteDatabase
instance. - postgresql://postgres:my_password@localhost:5432/my_database will create a
PostgresqlDatabase
instance. A username and password are provided, as well as the host and port to connect to. - mysql://user:passwd@ip:port/my_db will create a
MySQLDatabase
instance for the local MySQL database my_db. - mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 will create a
PooledMySQLDatabase
instance for the local MySQL database my_db with max_connections set to 20 and a stale_timeout setting of 300 seconds.
Supported schemes:
apsw
:APSWDatabase
mysql
:MySQLDatabase
mysql+pool
:PooledMySQLDatabase
postgres
:PostgresqlDatabase
postgres+pool
:PooledPostgresqlDatabase
postgresext
:PostgresqlExtDatabase
postgresext+pool
:PooledPostgresqlExtDatabase
sqlite
:SqliteDatabase
sqliteext
:SqliteExtDatabase
sqlite+pool
:PooledSqliteDatabase
sqliteext+pool
:PooledSqliteExtDatabase
Usage:
import os
from playhouse.db_url import connect
# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
parse
(url)
Parse the information in the given URL into a dictionary containing database
, host
, port
, user
and/or password
. Additional connection arguments can be passed in the URL query string.
If you are using a custom database class, you can use the parse()
function to extract information from a URL which can then be passed in to your database object.
register_database
(db_class, \names*)
Parameters: |
|
---|
Register additional database class under the specified names. This function can be used to extend the connect()
function to support additional schemes. Suppose you have a custom database class for Firebird
named FirebirdDatabase
.
from playhouse.db_url import connect, register_database
register_database(FirebirdDatabase, 'firebird')
db = connect('firebird://my-firebird-db')
Connection pool
The pool
module contains a number of Database
classes that provide connection pooling for PostgreSQL, MySQL and SQLite databases. The pool works by overriding the methods on the Database
class that open and close connections to the backend. The pool can specify a timeout after which connections are recycled, as well as an upper bound on the number of open connections.
In a multi-threaded application, up to max_connections will be opened. Each thread (or, if using gevent, greenlet) will have it’s own connection.
In a single-threaded application, only one connection will be created. It will be continually recycled until either it exceeds the stale timeout or is closed explicitly (using .manual_close()).
By default, all your application needs to do is ensure that connections are closed when you are finished with them, and they will be returned to the pool. For web applications, this typically means that at the beginning of a request, you will open a connection, and when you return a response, you will close the connection.
Simple Postgres pool example code:
# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase
db = PooledPostgresqlExtDatabase(
'my_app',
max_connections=32,
stale_timeout=300, # 5 minutes.
user='postgres')
class BaseModel(Model):
class Meta:
database = db
That’s it! If you would like finer-grained control over the pool of connections, check out the advanced_connection_management section.
Pool APIs
class PooledDatabase
(database[, max_connections=20[, stale_timeout=None[, timeout=None[, \*kwargs*]]]])
Parameters: |
|
---|
Mixin class intended to be used with a subclass of Database
.
Note
Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.
Note
If the number of open connections exceeds max_connections, a ValueError will be raised.
_connect
(\args, **kwargs*)Request a connection from the pool. If there are no available connections a new one will be opened.
_close
(conn[, close_conn=False])By default conn will not be closed and instead will be returned to the pool of available connections. If close_conn=True, then conn will be closed and not be returned to the pool.
manual_close
()Close the currently-open connection without returning it to the pool.
class PooledPostgresqlDatabase
Subclass of PostgresqlDatabase
that mixes in the PooledDatabase
helper.
class PooledPostgresqlExtDatabase
Subclass of PostgresqlExtDatabase
that mixes in the PooledDatabase
helper. The PostgresqlExtDatabase
is a part of the Postgresql Extensions module and provides support for many Postgres-specific features.
class PooledMySQLDatabase
Subclass of MySQLDatabase
that mixes in the PooledDatabase
helper.
class PooledSqliteDatabase
Persistent connections for SQLite apps.
class PooledSqliteExtDatabase
Persistent connections for SQLite apps, using the SQLite Extensions advanced database driver SqliteExtDatabase
.
Test Utils
Contains utilities helpful when testing peewee projects.
class count_queries
([only_select=False])
Context manager that will count the number of queries executed within the context.
Parameters: | only_select (bool) – Only count SELECT queries. |
---|
with count_queries() as counter:
huey = User.get(User.username == 'huey')
huey_tweets = [tweet.message for tweet in huey.tweets]
assert counter.count == 2
count
The number of queries executed.
get_queries
()Return a list of 2-tuples consisting of the SQL query and a list of parameters.
assert_query_count
(expected[, only_select=False])
Function or method decorator that will raise an AssertionError
if the number of queries executed in the decorated function does not equal the expected number.
class TestMyApp(unittest.TestCase):
@assert_query_count(1)
def test_get_popular_blogs(self):
popular_blogs = Blog.get_popular()
self.assertEqual(
[blog.title for blog in popular_blogs],
["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])
This function can also be used as a context manager:
class TestMyApp(unittest.TestCase):
def test_expensive_operation(self):
with assert_query_count(1):
perform_expensive_operation()
Flask Utils
The playhouse.flask_utils
module contains several helpers for integrating peewee with the Flask web framework.
Database Wrapper
The FlaskDB
class is a wrapper for configuring and referencing a Peewee database from within a Flask application. Don’t let it’s name fool you: it is not the same thing as a peewee database. FlaskDB
is designed to remove the following boilerplate from your flask app:
- Dynamically create a Peewee database instance based on app config data.
- Create a base class from which all your application’s models will descend.
- Register hooks at the start and end of a request to handle opening and closing a database connection.
Basic usage:
import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB
DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'
app = Flask(__name__)
app.config.from_object(__name__)
db_wrapper = FlaskDB(app)
class User(db_wrapper.Model):
username = CharField(unique=True)
class Tweet(db_wrapper.Model):
user = ForeignKeyField(User, backref='tweets')
content = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
The above code example will create and instantiate a peewee PostgresqlDatabase
specified by the given database URL. Request hooks will be configured to establish a connection when a request is received, and automatically close the connection when the response is sent. Lastly, the FlaskDB
class exposes a FlaskDB.Model
property which can be used as a base for your application’s models.
Here is how you can access the wrapped Peewee database instance that is configured for you by the FlaskDB
wrapper:
# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database
@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
with peewee_db.atomic():
# ...
return jsonify({'transfer-id': xid})
Note
The actual peewee database can be accessed using the FlaskDB.database
attribute.
Here is another way to configure a Peewee database using FlaskDB
:
app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')
While the above examples show using a database URL, for more advanced usages you can specify a dictionary of configuration options, or simply pass in a peewee Database
instance:
DATABASE = {
'name': 'my_app_db',
'engine': 'playhouse.pool.PooledPostgresqlDatabase',
'user': 'postgres',
'max_connections': 32,
'stale_timeout': 600,
}
app = Flask(__name__)
app.config.from_object(__name__)
wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database
Using a peewee Database
object:
peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)
Database with Application Factory
If you prefer to use the application factory pattern, the FlaskDB
class implements an init_app()
method.
Using as a factory:
db_wrapper = FlaskDB()
# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
username = CharField(unique=True)
def create_app():
app = Flask(__name__)
app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
db_wrapper.init_app(app)
return app
Query utilities
The flask_utils
module provides several helpers for managing queries in your web app. Some common patterns include:
get_object_or_404
(query_or_model, \query*)
Parameters: |
|
---|
Retrieve the object matching the given query, or return a 404 not found response. A common use-case might be a detail page for a weblog. You want to either retrieve the post matching the given URL, or return a 404.
Example:
@app.route('/blog/<slug>/')
def post_detail(slug):
public_posts = Post.select().where(Post.published == True)
post = get_object_or_404(public_posts, (Post.slug == slug))
return render_template('post_detail.html', post=post)
object_list
(template_name, query[, context_variable=’object_list’[, paginate_by=20[, page_var=’page’[, check_bounds=True[, \*kwargs*]]]]])
Parameters: |
|
---|
Retrieve a paginated list of objects specified by the given query. The paginated object list will be dropped into the context using the given context_variable
, as well as metadata about the current page and total number of pages, and finally any arbitrary context data passed as keyword-arguments.
The page is specified using the page
GET
argument, e.g. /my-object-list/?page=3
would return the third page of objects.
Example:
@app.route('/blog/')
def post_index():
public_posts = (Post
.select()
.where(Post.published == True)
.order_by(Post.timestamp.desc()))
return object_list(
'post_index.html',
query=public_posts,
context_variable='post_list',
paginate_by=10)
The template will have the following context:
post_list
, which contains a list of up to 10 posts.page
, which contains the current page based on the value of thepage
GET
parameter.pagination
, aPaginatedQuery
instance.
class PaginatedQuery
(query_or_model, paginate_by[, page_var=’page’[, check_bounds=False]])
Parameters: |
|
---|
Helper class to perform pagination based on GET
arguments.
get_page
()Return the currently selected page, as indicated by the value of the
page_var
GET
parameter. If no page is explicitly selected, then this method will return 1, indicating the first page.get_page_count
()Return the total number of possible pages.
get_object_list
()Using the value of
get_page()
, return the page of objects requested by the user. The return value is aSelectQuery
with the appropriateLIMIT
andOFFSET
clauses.If
check_bounds
was set toTrue
and the requested page contains no objects, then a 404 will be raised.