Playhouse, extensions to Peewee

Playhouse, extensions to Peewee

Playhouse, extensions to Peewee

Peewee comes with numerous extension modules which are collected under the playhouse namespace. Despite the silly name, there are some very useful extensions, particularly those that expose vendor-specific database features like the SQLite Extensions and Postgresql Extensions extensions.

Below you will find a loosely organized listing of the various modules that make up the playhouse.

Database drivers / vendor-specific database functionality

High-level features

Database management and framework integration

Sqlite Extensions

The Sqlite extensions have been moved to their own page.


The playhouse.sqliteq module provides a subclass of SqliteExtDatabase, that will serialize concurrent writes to a SQLite database. SqliteQueueDatabase can be used as a drop-in replacement for the regular SqliteDatabase if you want simple read and write access to a SQLite database from multiple threads.

SQLite only allows one connection to write to the database at any given time. As a result, if you have a multi-threaded application (like a web-server, for example) that needs to write to the database, you may see occasional errors when one or more of the threads attempting to write cannot acquire the lock.

SqliteQueueDatabase is designed to simplify things by sending all write queries through a single, long-lived connection. The benefit is that you get the appearance of multiple threads writing to the database without conflicts or timeouts. The downside, however, is that you cannot issue write transactions that encompass multiple queries – all writes run in autocommit mode, essentially.


The module gets its name from the fact that all write queries get put into a thread-safe queue. A single worker thread listens to the queue and executes all queries that are sent to it.


Because all queries are serialized and executed by a single worker thread, it is possible for transactional SQL from separate threads to be executed out-of-order. In the example below, the transaction started by thread “B” is rolled back by thread “A” (with bad consequences!):

  • Thread A: UPDATE transplants SET organ=’liver’, …;
  • Thread B: UPDATE life_support_system SET timer += 60 …;
  • Thread A: ROLLBACK; – Oh no….

Since there is a potential for queries from separate transactions to be interleaved, the transaction() and atomic() methods are disabled on SqliteQueueDatabase.

For cases when you wish to temporarily write to the database from a different thread, you can use the pause() and unpause() methods. These methods block the caller until the writer thread is finished with its current workload. The writer then disconnects and the caller takes over until unpause is called.

The stop(), start(), and is_stopped() methods can also be used to control the writer thread.


Take a look at SQLite’s isolation documentation for more information about how SQLite handles concurrent connections.

Code sample

Creating a database instance does not require any special handling. The SqliteQueueDatabase accepts some special parameters which you should be aware of, though. If you are using gevent, you must specify use_gevent=True when instantiating your database – this way Peewee will know to use the appropriate objects for handling queueing, thread creation, and locking.

from playhouse.sqliteq import SqliteQueueDatabase

db = SqliteQueueDatabase(
    use_gevent=False,  # Use the standard library "threading" module.
    autostart=False,  # The worker thread now must be started manually.
    queue_max_size=64,  # Max. # of pending writes that can accumulate.
    results_timeout=5.0)  # Max. time to wait for query to be executed.

If autostart=False, as in the above example, you will need to call start() to bring up the worker threads that will do the actual write query execution.

def _start_worker_threads():

If you plan on performing SELECT queries or generally wanting to access the database, you will need to call connect() and close() as you would with any other database instance.

When your application is ready to terminate, use the stop() method to shut down the worker thread. If there was a backlog of work, then this method will block until all pending work is finished (though no new work is allowed).

import atexit

def _stop_worker_threads():

Lastly, the is_stopped() method can be used to determine whether the database writer is up and running.

Sqlite User-Defined Functions

The sqlite_udf playhouse module contains a number of user-defined functions, aggregates, and table-valued functions, which you may find useful. The functions are grouped in collections and you can register these user-defined extensions individually, by collection, or register everything.

Scalar functions are functions which take a number of parameters and return a single value. For example, converting a string to upper-case, or calculating the MD5 hex digest.

Aggregate functions are like scalar functions that operate on multiple rows of data, producing a single result. For example, calculating the sum of a list of integers, or finding the smallest value in a particular column.

Table-valued functions are simply functions that can return multiple rows of data. For example, a regular-expression search function that returns all the matches in a given string, or a function that accepts two dates and generates all the intervening days.


To use table-valued functions, you will need to build the playhouse._sqlite_ext C extension.

Registering user-defined functions:

db = SqliteDatabase('my_app.db')

# Register *all* functions.

# Alternatively, you can register individual groups. This will just
# register the DATE and MATH groups of functions.
register_groups(db, 'DATE', 'MATH')

# If you only wish to register, say, the aggregate functions for a
# particular group or groups, you can:
register_aggregate_groups(db, 'DATE')

# If you only wish to register a single function, then you can:
from playhouse.sqlite_udf import gzip, gunzip
db.register_function(gzip, 'gzip')
db.register_function(gunzip, 'gunzip')

Using a library function (“hostname”):

# Assume we have a model, Link, that contains lots of arbitrary URLs.
# We want to discover the most common hosts that have been linked.
query = (Link
         .select(fn.hostname(Link.url).alias('host'), fn.COUNT(

# Print the hostname along with number of links associated with it.
for host, count in query:
    print('%s: %s' % (host, count))

Functions, listed by collection name

Scalar functions are indicated by (f), aggregate functions by (a), and table-valued functions by (t).


if_then_else(cond, truthy[, falsey=None])

Simple ternary-type operator, where, depending on the truthiness of the cond parameter, either the truthy or falsey value will be returned.


Parameters:date_str – A datetime, encoded as a string.
Returns:The datetime with any timezone info stripped off.

The time is not adjusted in any way, the timezone is simply removed.

humandelta(nseconds[, glue=', '])
  • nseconds (int) – Number of seconds, total, in timedelta.
  • glue (str) – Fragment to join values.

Easy-to-read description of timedelta.

Example, 86471 -> “1 day, 1 minute, 11 seconds”

Parameters:datetime_value – A date-time.
Returns:Minimum difference between any two values in list.

Aggregate function that computes the minimum difference between any two datetimes.

Parameters:datetime_value – A date-time.
Returns:Average difference between values in list.

Aggregate function that computes the average difference between consecutive values in the list.

Parameters:datetime_value – A date-time.
Returns:Duration from smallest to largest value in list, in seconds.

Aggregate function that computes the duration from the smallest to the largest value in the list, returned in seconds.

date_series(start, stop[, step_seconds=86400])
  • start (datetime) – Start datetime
  • stop (datetime) – Stop datetime
  • step_seconds (int) – Number of seconds comprising a step.

Table-value function that returns rows consisting of the date/+time values encountered iterating from start to stop, step_seconds at a time.

Additionally, if start does not have a time component and step_seconds is greater-than-or-equal-to one day (86400 seconds), the values returned will be dates. Conversely, if start does not have a date component, values will be returned as times. Otherwise values are returned as datetimes.


SELECT * FROM date_series('2017-01-28', '2017-02-02');



Parameters:filename (str) – Filename to extract extension from.
Returns:Returns the file extension, including the leading “.”.
Parameters:filename (str) – Filename to read.
Returns:Contents of the file.


gzip(data[, compression=9])
  • data (bytes) – Data to compress.
  • compression (int) – Compression level (9 is max).

Compressed binary data.

Parameters:data (bytes) – Compressed data.
Returns:Uncompressed binary data.
Parameters:url (str) – URL to extract hostname from.
Returns:hostname portion of URL
Parameters:key – Key to toggle.

Toggle a key between True/False state. Example:

>>> toggle('my-key')
>>> toggle('my-key')
>>> toggle('my-key')
setting(key[, value=None])
  • key – Key to set/retrieve.
  • value – Value to set.

Value associated with key.

Store/retrieve a setting in memory and persist during lifetime of application. To get the current value, only specify the key. To set a new value, call with key and new value.


Clears all state associated with the toggle() function.


Clears all state associated with the setting() function.


randomrange(start[, stop=None[, step=None]])
  • start (int) – Start of range (inclusive)
  • end (int) – End of range(not inclusive)
  • step (int) – Interval at which to return a value.

Return a random integer between [start, end).

gauss_distribution(mean, sigma)
  • mean (float) – Mean value
  • sigma (float) – Standard deviation

Calculate the square root of n.

Parameters:s (str) – String to convert to number.
Returns:Integer, floating-point or NULL on failure.
Parameters:val – Numbers in list.
Returns:The mode, or most-common, number observed.

Aggregate function which calculates mode of values.

Parameters:val – Value
Returns:Min difference between two values.

Aggregate function which calculates the minimal distance between two numbers in the sequence.

Parameters:val – Value
Returns:Average difference between values.

Aggregate function which calculates the average distance between two consecutive numbers in the sequence.

Parameters:val – Value
Returns:The range from the smallest to largest value in sequence.

Aggregate function which returns range of values observed.

Parameters:val – Value
Returns:The median, or middle, value in a sequence.

Aggregate function which calculates the middle value in a sequence.


Only available if you compiled the _sqlite_udf extension.


substr_count(haystack, needle)

Returns number of times needle appears in haystack.

strip_chars(haystack, chars)

Strips any characters in chars from beginning and end of haystack.

damerau_levenshtein_dist(s1, s2)

Computes the edit distance from s1 to s2 using the damerau variant of the levenshtein algorithm.


Only available if you compiled the _sqlite_udf extension.

levenshtein_dist(s1, s2)

Computes the edit distance from s1 to s2 using the levenshtein algorithm.


Only available if you compiled the _sqlite_udf extension.

str_dist(s1, s2)

Computes the edit distance from s1 to s2 using the standard library SequenceMatcher’s algorithm.


Only available if you compiled the _sqlite_udf extension.

  • regex (str) – Regular expression
  • search_string (str) – String to search for instances of regex.

Table-value function that searches a string for substrings that match the provided regex. Returns rows for each match found.


SELECT * FROM regex_search('\w+', 'extract words, ignore! symbols');


apsw, an advanced sqlite driver

The apsw_ext module contains a database class suitable for use with the apsw sqlite driver.

APSW Project page:

APSW is a really neat library that provides a thin wrapper on top of SQLite’s C interface, making it possible to use all of SQLite’s advanced features.

Here are just a few reasons to use APSW, taken from the documentation:

  • APSW gives all functionality of SQLite, including virtual tables, virtual file system, blob i/o, backups and file control.
  • Connections can be shared across threads without any additional locking.
  • Transactions are managed explicitly by your code.
  • APSW can handle nested transactions.
  • Unicode is handled correctly.
  • APSW is faster.

For more information on the differences between apsw and pysqlite, check the apsw docs.

How to use the APSWDatabase

from apsw_ext import *

db = APSWDatabase(':memory:')

class BaseModel(Model):
    class Meta:
        database = db

class SomeModel(BaseModel):
    col1 = CharField()
    col2 = DateTimeField()

apsw_ext API notes

APSWDatabase extends the SqliteExtDatabase and inherits its advanced features.

class APSWDatabase(database, **connect_kwargs)
  • database (string) – filename of sqlite database
  • connect_kwargs – keyword arguments passed to apsw when opening a connection
register_module(mod_name, mod_inst)

Provides a way of globally registering a module. For more information, see the documentation on virtual tables.

  • mod_name (string) – name to use for module
  • mod_inst (object) – an object implementing the Virtual Table interface

Unregister a module.

Parameters:mod_name (string) – name to use for module


Be sure to use the Field subclasses defined in the apsw_ext module, as they will properly handle adapting the data types for storage.

For example, instead of using peewee.DateTimeField, be sure you are importing and using playhouse.apsw_ext.DateTimeField.

Sqlcipher backend

  • Although this extention’s code is short, it has not been properly peer-reviewed yet and may have introduced vulnerabilities.

Also note that this code relies on pysqlcipher and sqlcipher, and the code there might have vulnerabilities as well, but since these are widely used crypto modules, we can expect “short zero days” there.

sqlcipher_ext API notes

class SqlCipherDatabase(database, passphrase, **kwargs)

Subclass of SqliteDatabase that stores the database encrypted. Instead of the standard sqlite3 backend, it uses pysqlcipher: a python wrapper for sqlcipher, which – in turn – is an encrypted wrapper around sqlite3, so the API is identical to SqliteDatabase’s, except for object construction parameters:

  • database – Path to encrypted database filename to open [or create].
  • passphrase – Database encryption passphrase: should be at least 8 character long, but it is strongly advised to enforce better passphrase strength criteria in your implementation.
  • If the database file doesn’t exist, it will be created with encryption by a key derived from passhprase.
  • When trying to open an existing database, passhprase should be identical to the ones used when it was created. If the passphrase is incorrect, an error will be raised when first attempting to access the database.
Parameters:passphrase (str) – New passphrase for database.

Change the passphrase for database.


SQLCipher can be configured using a number of extension PRAGMAs. The list of PRAGMAs and their descriptions can be found in the SQLCipher documentation.

For example to specify the number of PBKDF2 iterations for the key derivation (64K in SQLCipher 3.x, 256K in SQLCipher 4.x by default):

# Use 1,000,000 iterations.
db = SqlCipherDatabase('my_app.db', pragmas={'kdf_iter': 1000000})

To use a cipher page-size of 16KB and a cache-size of 10,000 pages:

db = SqlCipherDatabase('my_app.db', passphrase='secret!!!', pragmas={
    'cipher_page_size': 1024 * 16,
    'cache_size': 10000})  # 10,000 16KB pages, or 160MB.

Example of prompting the user for a passphrase:

db = SqlCipherDatabase(None)

class BaseModel(Model):
    """Parent for all app's models"""
    class Meta:
        # We won't have a valid db until user enters passhrase.
        database = db

# Derive our model subclasses
class Person(BaseModel):
    name = TextField(primary_key=True)

right_passphrase = False
while not right_passphrase:

    try:  # Actually execute a query against the db to test passphrase.
    except DatabaseError as exc:
        # This error indicates the password was wrong.
        if exc.args[0] == 'file is encrypted or is not a database':
            db.init(None)  # Reset the db.
            raise exc
        # The password was correct.
        right_passphrase = True

See also: a slightly more elaborate example.

Postgresql Extensions

The postgresql extensions module provides a number of “postgres-only” functions, currently:

In the future I would like to add support for more of postgresql’s features. If there is a particular feature you would like to see added, please open a Github issue.


In order to start using the features described below, you will need to use the extension PostgresqlExtDatabase class instead of PostgresqlDatabase.

The code below will assume you are using the following database and base model:

from playhouse.postgres_ext import *

ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')

class BaseExtModel(Model):
    class Meta:
        database = ext_db

JSON Support

peewee has basic support for Postgres’ native JSON data type, in the form of JSONField. As of version 2.4.7, peewee also supports the Postgres 9.4 binary json jsonb type, via BinaryJSONField.


Postgres supports a JSON data type natively as of 9.2 (full support in 9.3). In order to use this functionality you must be using the correct version of Postgres with psycopg2 version 2.5 or greater.

To use BinaryJSONField, which has many performance and querying advantages, you must have Postgres 9.4 or later.


You must be sure your database is an instance of PostgresqlExtDatabase in order to use the JSONField.

Here is an example of how you might declare a model with a JSON field:

import json
import urllib2
from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_database')

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

    def request(cls, url):
        fh = urllib2.urlopen(url)
        return cls.create(url=url, response=json.loads(


# Store a JSON response.
offense = APIResponse.request('')
booking = APIResponse.request('')

# Query a JSON data structure using a nested key lookup:
offense_responses =
    APIResponse.response['meta']['model'] == 'offense')

# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
     .where(['meta']['model'] == 'booking'))

for result in q:
    print(result.person['name'], result.person['dob'])

The BinaryJSONField works the same and supports the same operations as the regular JSONField, but provides several additional operations for testing containment. Using the binary json field, you can test whether your JSON data contains other partial JSON structures (contains(), contains_any(), contains_all()), or whether it is a subset of a larger JSON document (contained_by()).

For more examples, see the JSONField and BinaryJSONField API documents below.

hstore support

Postgresql hstore is an embedded key/value store. With hstore, you can store arbitrary key/value pairs in your database alongside structured relational data.

To use hstore, you need to specify an additional parameter when instantiating your PostgresqlExtDatabase:

# Specify "register_hstore=True":
db = PostgresqlExtDatabase('my_db', register_hstore=True)

Currently the postgres_ext module supports the following operations:

  • Store and retrieve arbitrary dictionaries
  • Filter by key(s) or partial dictionary
  • Update/add one or more keys to an existing dictionary
  • Delete one or more keys from an existing dictionary
  • Select keys, values, or zip keys and values
  • Retrieve a slice of keys/values
  • Test for the existence of a key
  • Test that a key has a non-NULL value

Using hstore

To start with, you will need to import the custom database class and the hstore functions from playhouse.postgres_ext (see above code snippet). Then, it is as simple as adding a HStoreField to your model:

class House(BaseExtModel):
    address = CharField()
    features = HStoreField()

You can now store arbitrary key/value pairs on House instances:

>>> h = House.create(
...     address='123 Main St',
...     features={'garage': '2 cars', 'bath': '2 bath'})
>>> h_from_db = House.get( ==
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}

You can filter by individual key, multiple keys or partial dictionary:

>>> query =
>>> garage = query.where(House.features.contains('garage'))
>>> garage_and_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))

Suppose you want to do an atomic update to the house:

>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where( ==
>>> h = House.get( ==
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}

Or, alternatively an atomic delete:

>>> query = House.update(features=House.features.delete('bath'))
>>> query.where( ==
>>> h = House.get( ==
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}

Multiple keys can be deleted at the same time:

>>> query = House.update(features=House.features.delete('garage', 'sqft'))

You can select just keys, just values, or zip the two:

>>> for h in, House.features.keys().alias('keys')):
...     print(h.address, h.keys)

123 Main St [u'bath', u'garage']

>>> for h in, House.features.values().alias('vals')):
...     print(h.address, h.vals)

123 Main St [u'2 bath', u'2 cars']

>>> for h in, House.features.items().alias('mtx')):
...     print(h.address, h.mtx)

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]

You can retrieve a slice of data, for example, all the garage data:

>>> query =, House.features.slice('garage').alias('garage_data'))
>>> for house in query:
...     print(house.address, house.garage_data)

123 Main St {'garage': '2 cars'}

You can check for the existence of a key and filter rows accordingly:

>>> has_garage = House.features.exists('garage')
>>> for house in, has_garage.alias('has_garage')):
...     print(house.address, house.has_garage)

123 Main St True

>>> for house in'garage')):
...     print(house.address, house.features['garage'])  # <-- just houses w/garage data

123 Main St 2 cars

Interval support

Postgres supports durations through the INTERVAL data-type (docs).

class IntervalField([null=False[, ...]])

Field class capable of storing Python datetime.timedelta instances.


from datetime import timedelta

from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_db')

class Event(Model):
    location = CharField()
    duration = IntervalField()
    start_time = DateTimeField()

    class Meta:
        database = db

    def get_long_meetings(cls):
        return > timedelta(hours=1))

Server-side cursors

When psycopg2 executes a query, normally all results are fetched and returned to the client by the backend. This can cause your application to use a lot of memory when making large queries. Using server-side cursors, results are returned a little at a time (by default 2000 records). For the definitive reference, please see the psycopg2 documentation.


To use server-side (or named) cursors, you must be using PostgresqlExtDatabase.

To execute a query using a server-side cursor, simply wrap your select query using the ServerSide() helper:

large_query =  # Build query normally.

# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
    # do some interesting analysis here.

# Server-side resources are released.

If you would like all SELECT queries to automatically use a server-side cursor, you can specify this when creating your PostgresqlExtDatabase:

from postgres_ext import PostgresqlExtDatabase

ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)


Server-side cursors live only as long as the transaction, so for this reason peewee will not automatically call commit() after executing a SELECT query. If you do not commit after you are done iterating, you will not release the server-side resources until the connection is closed (or the transaction is committed later). Furthermore, since peewee will by default cache rows returned by the cursor, you should always call .iterator() when iterating over a large query.

If you are using the ServerSide() helper, the transaction and call to iterator() will be handled transparently.

postgres_ext API notes

class PostgresqlExtDatabase(database[, server_side_cursors=False[, register_hstore=False[, ...]]])

Identical to PostgresqlDatabase but required in order to support:

  • database (str) – Name of database to connect to.
  • server_side_cursors (bool) – Whether SELECT queries should utilize server-side cursors.
  • register_hstore (bool) – Register the HStore extension with the connection.

If you wish to use the HStore extension, you must specify register_hstore=True.

If using server_side_cursors, also be sure to wrap your queries with ServerSide().

Parameters:select_query – a SelectQuery instance.
Rtype generator:

Wrap the given select query in a transaction, and call its iterator() method to avoid caching row instances. In order for the server-side resources to be released, be sure to exhaust the generator (iterate over all the rows).


large_query =
for page_view in ServerSide(large_query):
    # Do something interesting.

# At this point server side resources are released.
class ArrayField([field_class=IntegerField[, field_kwargs=None[, dimensions=1[, convert_values=False]]]])
  • field_class – a subclass of Field, e.g. IntegerField.
  • field_kwargs (dict) – arguments to initialize field_class.
  • dimensions (int) – dimensions of array.
  • convert_values (bool) – apply field_class value conversion to array data.

Field capable of storing arrays of the provided field_class.


By default ArrayField will use a GIN index. To disable this, initialize the field with index=False.

You can store and retrieve lists (or lists-of-lists):

class BlogPost(BaseModel):
    content = TextField()
    tags = ArrayField(CharField)

post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])

Additionally, you can use the __getitem__ API to query values or slices in the database:

# Get the first tag on a given blog post.
first_tag = (BlogPost
             .where( == 1)

# first_tag = {'first_tag': 'foo'}

Get a slice of values:

# Get the first two tags.
two_tags = (BlogPost
# two_tags = {'two': ['foo', 'bar']}
Parameters:items – One or more items that must be in the given array field.
# Get all blog posts that are tagged with both "python" and "django".'python', 'django'))
Parameters:items – One or more items to search for in the given array field.

Like contains(), except will match rows where the array contains any of the given items.

# Get all blog posts that are tagged with "flask" and/or "django".'flask', 'django'))
class DateTimeTZField(*args, **kwargs)

A timezone-aware subclass of DateTimeField.

class HStoreField(*args, **kwargs)

A field for storing and retrieving arbitrary key/value pairs. For details on usage, see hstore support.


To use the HStoreField you will need to be sure the hstore extension is registered with the connection. To accomplish this, instantiate the PostgresqlExtDatabase with register_hstore=True.


By default HStoreField will use a GiST index. To disable this, initialize the field with index=False.


Returns the keys for a given row.

>>> for h in, House.features.keys().alias('keys')):
...     print(h.address, h.keys)

123 Main St [u'bath', u'garage']

Return the values for a given row.

>>> for h in, House.features.values().alias('vals')):
...     print(h.address, h.vals)

123 Main St [u'2 bath', u'2 cars']

Like python’s dict, return the keys and values in a list-of-lists:

>>> for h in, House.features.items().alias('mtx')):
...     print(h.address, h.mtx)

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]

Return a slice of data given a list of keys.

>>> for h in, House.features.slice('garage').alias('garage_data')):
...     print(h.address, h.garage_data)

123 Main St {'garage': '2 cars'}

Query for whether the given key exists.

>>> for h in, House.features.exists('garage').alias('has_garage')):
...     print(h.address, h.has_garage)

123 Main St True

>>> for h in'garage')):
...     print(h.address, h.features['garage']) # <-- just houses w/garage data

123 Main St 2 cars

Query for whether the given key has a value associated with it.


Perform an atomic update to the keys/values for a given row or rows.

>>> query = House.update(features=House.features.update(
...     sqft=2000,
...     year_built=2012))
>>> query.where( == 1).execute()

Delete the provided keys for a given row or rows.


We will use an UPDATE query.

>>> query = House.update(features=House.features.delete(
...     'sqft', 'year_built'))
>>> query.where( == 1).execute()
Parameters:value – Either a dict, a list of keys, or a single key.

Query rows for the existence of either:

  • a partial dictionary.
  • a list of keys.
  • a single key.
>>> query =
>>> has_garage = query.where(House.features.contains('garage'))
>>> garage_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
Parameters:keys – One or more keys to search for.

Query rows for the existence of any key.

class JSONField(dumps=None, *args, **kwargs)
Parameters:dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper.

Field class suitable for storing and querying arbitrary JSON. When using this on a model, set the field’s value to a Python object (either a dict or a list). When you retrieve your value from the database it will be returned as a Python data structure.


You must be using Postgres 9.2 / psycopg2 2.5 or greater.


If you are using Postgres 9.4, strongly consider using the BinaryJSONField instead as it offers better performance and more powerful querying options.

Example model declaration:

db = PostgresqlExtDatabase('my_db')

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

Example of storing JSON data:

url = ''
resp = json.loads(urllib2.urlopen(url).read())
APIResponse.create(url=url, response=resp)

APIResponse.create(url='', response={'key': 'value'})

To query, use Python’s [] operators to specify nested key or array lookups:
    APIResponse.response['key1']['nested-key'] == 'some-value')

To illustrate the use of the [] operators, imagine we have the following data stored in an APIResponse:

  "foo": {
    "bar": ["i1", "i2", "i3"],
    "baz": {
      "huey": "mickey",
      "peewee": "nugget"

Here are the results of a few queries:

def get_data(expression):
    # Helper function to just retrieve the results of a
    # particular expression.
    query = (APIResponse
    return query['my_data']

# Accessing the foo -> bar subkey will return a JSON
# representation of the list.
# '["i1", "i2", "i3"]'

# In order to retrieve this list as a Python list,
# we will call .as_json() on the expression.
# ['i1', 'i2', 'i3']

# Similarly, accessing the foo -> baz subkey will
# return a JSON representation of the dictionary.
# '{"huey": "mickey", "peewee": "nugget"}'

# Again, calling .as_json() will return an actual
# python dictionary.
# {'huey': 'mickey', 'peewee': 'nugget'}

# When dealing with simple values, either way works as
# you expect.
# 'i1'

# Calling .as_json() when the result is a simple value
# will return the same thing as the previous example.
# 'i1'
class BinaryJSONField(dumps=None, *args, **kwargs)
Parameters:dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper.

Store and query arbitrary JSON documents. Data should be stored using normal Python dict and list objects, and when data is returned from the database, it will be returned using dict and list as well.

For examples of basic query operations, see the above code samples for JSONField. The example queries below will use the same APIResponse model described above.


By default BinaryJSONField will use a GiST index. To disable this, initialize the field with index=False.


You must be using Postgres 9.4 / psycopg2 2.5 or newer. If you are using Postgres 9.2 or 9.3, you can use the regular JSONField instead.


Test whether the given JSON data contains the given JSON fragment or key.


search_fragment = {
    'foo': {'bar': ['i2']}
query = (APIResponse

# If we're searching for a list, the list items do not need to
# be ordered in a particular way:
query = (APIResponse
             'foo': {'bar': ['i2', 'i1']}})))

We can pass in simple keys as well. To find APIResponses that contain the key foo at the top-level:'foo'))

We can also search sub-keys using square-brackets:['foo']['bar'].contains(['i2', 'i1']))

Search for the presence of one or more of the given items.'foo', 'baz', 'nugget'))

Like contains(), we can also search sub-keys:['foo']['bar'].contains_any('i2', 'ix'))

Search for the presence of all of the given items.'foo'))

Like contains_any(), we can also search sub-keys:['foo']['bar'].contains_all('i1', 'i2', 'i3'))

Test whether the given JSON document is contained by (is a subset of) the given JSON document. This method is the inverse of contains().

big_doc = {
    'foo': {
        'bar': ['i1', 'i2', 'i3'],
        'baz': {
            'huey': 'mickey',
            'peewee': 'nugget',
    'other_key': ['nugget', 'bear', 'kitten'],

Concatentate two field data and the provided data. Note that this operation does not merge or do a “deep concat”.


Test whether the key exists at the top-level of the JSON object.


Remove one or more keys from the top-level of the JSON object.

Match(field, query)

Generate a full-text search expression, automatically converting the left-hand operand to a tsvector, and the right-hand operand to a tsquery.


def blog_search(search_term):
        (Blog.status == Blog.STATUS_PUBLISHED) &
        Match(Blog.content, search_term))
class TSVectorField

Field type suitable for storing tsvector data. This field will automatically be created with a GIN index for improved search performance.


Data stored in this field will still need to be manually converted to the tsvector type.


By default TSVectorField will use a GIN index. To disable this, initialize the field with index=False.

Example usage:

class Blog(Model):
    content = TextField()
    search_content = TSVectorField()

content = 'this is a sample blog entry.'
blog_entry = Blog.create(
    search_content=fn.to_tsvector(content))  # Note `to_tsvector()`.
match(query[, language=None[, plain=False]])
  • query (str) – the full-text search query.
  • language (str) – language name (optional).
  • plain (bool) – parse search query using plain (simple) parser.

an expression representing full-text search/match.


# Perform a search using the "match" method.
terms = 'python & (sqlite | postgres)'
results =

Cockroach Database

CockroachDB (CRDB) is well supported by peewee.

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', host='')

The playhouse.cockroachdb extension module provides the following classes and helpers:

Special field-types that may be useful when using CRDB:

  • UUIDKeyField - a primary-key field implementation that uses CRDB’s UUID type with a default randomly-generated UUID.
  • RowIDField - a primary-key field implementation that uses CRDB’s INT type with a default unique_rowid().
  • JSONField - same as the Postgres BinaryJSONField, as CRDB treats JSON as JSONB.
  • ArrayField - same as the Postgres extension (but does not support multi-dimensional arrays).

CRDB is compatible with Postgres’ wire protocol and exposes a very similar SQL interface, so it is possible (though not recommended) to use PostgresqlDatabase with CRDB:

  1. CRDB does not support nested transactions (savepoints), so the atomic() method has been implemented to enforce this when using CockroachDatabase. For more info CRDB Transactions.
  2. CRDB may have subtle differences in field-types, date functions and introspection from Postgres.
  3. CRDB-specific features are exposed by the CockroachDatabase, such as specifying a transaction priority or the AS OF SYSTEM TIME clause.

CRDB Transactions

CRDB does not support nested transactions (savepoints), so the atomic() method on the CockroachDatabase has been modified to raise an exception if an invalid nesting is encountered. If you would like to be able to nest transactional code, you can use the transaction() method, which will ensure that the outer-most block will manage the transaction (e.g., exiting a nested-block will not cause an early commit).


def create_user(username):
    return User.create(username=username)

def some_other_function():
    with db.transaction() as txn:
        # do some stuff...

        # This function is wrapped in a transaction, but the nested
        # transaction will be ignored and folded into the outer
        # transaction, as we are already in a wrapped-block (via the
        # context manager).

        # do other stuff.

    # At this point we have exited the outer-most block and the transaction
    # will be committed.

CRDB provides client-side transaction retries, which are available using a special run_transaction() helper. This helper method accepts a callable, which is responsible for executing any transactional statements that may need to be retried.

Simplest possible example of run_transaction():

def create_user(email):
    # Callable that accepts a single argument (the database instance) and
    # which is responsible for executing the transactional SQL.
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

huey = create_user('')


The cockroachdb.ExceededMaxAttempts exception will be raised if the transaction cannot be committed after the given number of attempts. If the SQL is mal-formed, violates a constraint, etc., then the function will raise the exception to the caller.

Example of using run_transaction() to implement client-side retries for a transaction that transfers an amount from one account to another:

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app')

def transfer_funds(from_id, to_id, amt):
    Returns a 3-tuple of (success?, from balance, to balance). If there are
    not sufficient funds, then the original balances are returned.
    def thunk(db_ref):
        src, dest = (Account
                     .where([from_id, to_id])))
        if != from_id:
            src, dest = dest, src  # Swap order.

        # Cannot perform transfer, insufficient funds!
        if src.balance < amt:
            return False, src.balance, dest.balance

        # Update each account, returning the new balance.
        src, = (Account
                .update(balance=Account.balance - amt)
                .where( == from_id)
        dest, = (Account
                 .update(balance=Account.balance + amt)
                 .where( == to_id)
        return True, src.balance, dest.balance

    # Perform the queries that comprise a logical transaction. In the
    # event the transaction fails due to contention, it will be auto-
    # matically retried (up to 10 times).
    return db.run_transaction(thunk, max_attempts=10)


class CockroachDatabase(database[, **kwargs])

CockroachDB implementation, based on the PostgresqlDatabase and using the psycopg2 driver.

Additional keyword arguments are passed to the psycopg2 connection constructor, and may be used to specify the database user, port, etc.

run_transaction(callback[, max_attempts=None[, system_time=None[, priority=None]]])
  • callback – callable that accepts a single db parameter (which will be the database instance this method is called from).
  • max_attempts (int) – max number of times to try before giving up.
  • system_time (datetime) – execute the transaction AS OF SYSTEM TIME with respect to the given value.
  • priority (str) – either “low”, “normal” or “high”.

returns the value returned by the callback.


ExceededMaxAttempts if max_attempts is exceeded.

Run SQL in a transaction with automatic client-side retries.

User-provided callback:

  • Must accept one parameter, the db instance representing the connection the transaction is running under.
  • Must not attempt to commit, rollback or otherwise manage the transaction.
  • May be called more than one time.
  • Should ideally only contain SQL operations.

Additionally, the database must not have any open transactions at the time this function is called, as CRDB does not support nested transactions. Attempting to do so will raise a NotImplementedError.

Simplest possible example:

def create_user(email):
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

user = create_user('')
class PooledCockroachDatabase(database[, **kwargs])

CockroachDB connection-pooling implementation, based on PooledPostgresqlDatabase. Implements the same APIs as CockroachDatabase, but will do client-side connection pooling.

run_transaction(db, callback[, max_attempts=None[, system_time=None[, priority=None]]])

Run SQL in a transaction with automatic client-side retries. See CockroachDatabase.run_transaction() for details.

  • db (CockroachDatabase) – database instance.
  • callback – callable that accepts a single db parameter (which will be the same as the value passed above).


This function is equivalent to the identically-named method on the CockroachDatabase class.

class UUIDKeyField

UUID primary-key field that uses the CRDB gen_random_uuid() function to automatically populate the initial value.

class RowIDField

Auto-incrementing integer primary-key field that uses the CRDB unique_rowid() function to automatically populate the initial value.

See also:

  • BinaryJSONField from the Postgresql extension (available in the cockroachdb extension module, and aliased to JSONField).
  • ArrayField from the Postgresql extension.

MySQL Extensions

Peewee provides an alternate database implementation for using the mysql-connector driver. The implementation can be found in playhouse.mysql_ext.

Example usage:

from playhouse.mysql_ext import MySQLConnectorDatabase

# MySQL database implementation that utilizes mysql-connector driver.
db = MySQLConnectorDatabase('my_database', host='', user='mysql')

Additional MySQL-specific helpers:

class JSONField

Extends TextField and implements transparent JSON encoding and decoding in Python.

Match(columns, expr[, modifier=None])
  • columns – a single Field or a tuple of multiple fields.
  • expr (str) – the full-text search expression.
  • modifier (str) – optional modifiers for the search, e.g. ‘in boolean mode’.

Helper class for constructing MySQL full-text search queries of the form:

MATCH (columns, ...) AGAINST (expr[ modifier])


The dataset module contains a high-level API for working with databases modeled after the popular project of the same name. The aims of the dataset module are to provide:

  • A simplified API for working with relational data, along the lines of working with JSON.
  • An easy way to export relational data as JSON or CSV.
  • An easy way to import JSON or CSV data into a relational database.

A minimal data-loading script might look like this:

from playhouse.dataset import DataSet

db = DataSet('sqlite:///:memory:')

table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')

huey = table.find_one(name='Huey')
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}

for obj in table:
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}

You can insert, update or delete using the dictionary APIs as well:

huey = table.find_one(name='Huey')
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}

# Perform an update by supplying a partial record of changes.
table[1] = {'gender': 'male', 'age': 4}
# {'age': 4, 'gender': 'male', 'id': 1, 'name': 'Huey'}

# Or insert a new record:
table[3] = {'name': 'Zaizee', 'age': 2}
# {'age': 2, 'gender': None, 'id': 3, 'name': 'Zaizee'}

# Or delete a record:
del table[3]  # Remove the row we just added.

You can export or import data using freeze() and thaw():

# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')

# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')

Getting started

DataSet objects are initialized by passing in a database URL of the format dialect://user:password@host/dbname. See the Database URL section for examples of connecting to various databases.

# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')

Storing data

To store data, we must first obtain a reference to a table. If the table does not exist, it will be created automatically:

# Get a table reference, creating the table if it does not exist.
table = db['users']

We can now insert() new rows into the table. If the columns do not exist, they will be created automatically:

table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')

To update existing entries in the table, pass in a dictionary containing the new values and filter conditions. The list of columns to use as filters is specified in the columns argument. If no filter columns are specified, then all rows will be updated.

# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])

# Update all records. If the column does not exist, it will be created.

Importing data

To import data from an external source, such as a JSON or CSV file, you can use the thaw() method. By default, new columns will be created for any attributes encountered. If you wish to only populate columns that are already defined on a table, you can pass in strict=True.

# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')

# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
 {'id': 2, 'ticker': 'AAPL', 'price': 109},
 {'id': 3, 'ticker': 'AMZN', 'price': 300}]

Using transactions

DataSet supports nesting transactions using a simple context manager.

table = db['users']
with db.transaction() as txn:

    with db.transaction() as nested_txn:
        # Set Charlie's favorite ORM to Django.
        table.update(name='Charlie', favorite_orm='django', columns=['name'])

        # jk/lol

Inspecting the database

You can use the tables() method to list the tables in the current database:

>>> print db.tables
['sometable', 'user']

And for a given table, you can print the columns:

>>> table = db['user']
>>> print table.columns
['id', 'age', 'name', 'gender', 'favorite_orm']

We can also find out how many rows are in a table:

>>> print len(db['user'])

Reading data

To retrieve all rows, you can use the all() method:

# Retrieve all the users.
users = db['user'].all()

# We can iterate over all rows without calling `.all()`
for user in db['user']:
    print user['name']

Specific objects can be retrieved using find() and find_one().

# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')

# Find Huey.
huey = db['user'].find_one(name='Huey')

Exporting data

To export data, use the freeze() method, passing in the query you wish to export:

peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')


class DataSet(url, **kwargs)

The DataSet class provides a high-level API for working with relational databases.


Return a list of tables stored in the database. This list is computed dynamically each time it is accessed.


Provide a Table reference to the specified table. If the table does not exist, it will be created.

query(sql[, params=None[, commit=True]])
  • sql (str) – A SQL query.
  • params (list) – Optional parameters for the query.
  • commit (bool) – Whether the query should be committed upon execution.

A database cursor.

Execute the provided query against the database.


Create a context manager representing a new transaction (or savepoint).

freeze(query[, format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
  • query – A SelectQuery, generated using all() or ~Table.find.
  • format – Output format. By default, csv and json are supported.
  • filename – Filename to write output to.
  • file_obj – File-like object to write output to.
  • kwargs – Arbitrary parameters for export-specific functionality.
thaw(table[, format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
  • table (str) – The name of the table to load data into.
  • format – Input format. By default, csv and json are supported.
  • filename – Filename to read data from.
  • file_obj – File-like object to read data from.
  • strict (bool) – Whether to store values for columns that do not already exist on the table.
  • kwargs – Arbitrary parameters for import-specific functionality.

Open a connection to the underlying database. If a connection is not opened explicitly, one will be opened the first time a query is executed.


Close the connection to the underlying database.

class Table(dataset, name, model_class)

Provides a high-level API for working with rows in a given table.


Return a list of columns in the given table.


A dynamically-created Model class.

create_index(columns[, unique=False])

Create an index on the given columns:

# Create a unique index on the `username` column.
db['users'].create_index(['username'], unique=True)

Insert the given data dictionary into the table, creating new columns as needed.

update(columns=None, conjunction=None, **data)

Update the table using the provided data. If one or more columns are specified in the columns parameter, then those columns’ values in the data dictionary will be used to determine which rows to update.

# Update all rows.

# Only update Huey's record, setting his age to 3.
db['users'].update(name='Huey', age=3, columns=['name'])

Query the table for rows matching the specified equality conditions. If no query is specified, then all rows are returned.

peewee_users = db['users'].find(favorite_orm='peewee')

Return a single row matching the specified equality conditions. If no matching row is found then None will be returned.

huey = db['users'].find_one(name='Huey')

Return all rows in the given table.


Delete all rows matching the given equality conditions. If no query is provided, then all rows will be deleted.

# Adios, Django!

# Delete all the secret messages.
freeze([format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
  • format – Output format. By default, csv and json are supported.
  • filename – Filename to write output to.
  • file_obj – File-like object to write output to.
  • kwargs – Arbitrary parameters for export-specific functionality.
thaw([format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
  • format – Input format. By default, csv and json are supported.
  • filename – Filename to read data from.
  • file_obj – File-like object to read data from.
  • strict (bool) – Whether to store values for columns that do not already exist on the table.
  • kwargs – Arbitrary parameters for import-specific functionality.


These fields can be found in the playhouse.fields module.

class CompressedField([compression_level=6[, algorithm='zlib'[, **kwargs]]])
  • compression_level (int) – A value from 0 to 9.
  • algorithm (str) – Either 'zlib' or 'bz2'.

Stores compressed data using the specified algorithm. This field extends BlobField, transparently storing a compressed representation of the data in the database.

class PickleField

Stores arbitrary Python data by transparently pickling and un-pickling data stored in the field. This field extends BlobField. If the cPickle module is available, it will be used.

Hybrid Attributes

Hybrid attributes encapsulate functionality that operates at both the Python and SQL levels. The idea for hybrid attributes comes from a feature of the same name in SQLAlchemy. Consider the following example:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    def length(self):
        return self.end - self.start

    def contains(self, point):
        return (self.start <= point) & (point < self.end)

The hybrid attribute gets its name from the fact that the length attribute will behave differently depending on whether it is accessed via the Interval class or an Interval instance.

If accessed via an instance, then it behaves just as you would expect.

If accessed via the Interval.length class attribute, however, the length calculation will be expressed as a SQL expression. For example:

query = > 5)

This query will be equivalent to the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)

The playhouse.hybrid module also contains a decorator for implementing hybrid methods which can accept parameters. As with hybrid properties, when accessed via a model instance, then the function executes normally as-written. When the hybrid method is called on the class, however, it will generate a SQL expression.


query =

This query is equivalent to the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))

There is an additional API for situations where the python implementation differs slightly from the SQL implementation. Let’s add a radius method to the Interval model. Because this method calculates an absolute value, we will use the Python abs() function for the instance portion and the fn.ABS() SQL function for the class portion.

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    def length(self):
        return self.end - self.start

    def radius(self):
        return abs(self.length) / 2

    def radius(cls):
        return fn.ABS(cls.length) / 2

What is neat is that both the radius implementations refer to the length hybrid attribute! When accessed via an Interval instance, the radius calculation will be executed in Python. When invoked via an Interval class, we will get the appropriate SQL.


query = < 3)

This query is equivalent to the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)

Pretty neat, right? Thanks for the cool idea, SQLAlchemy!

Hybrid API

class hybrid_method(func[, expr=None])

Method decorator that allows the definition of a Python object method with both instance-level and class-level behavior.


class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    def contains(self, point):
        return (self.start <= point) & (point < self.end)

When called with an Interval instance, the contains method will behave as you would expect. When called as a classmethod, though, a SQL expression will be generated:

query =

Would generate the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))

Method decorator for specifying the SQL-expression producing method.

class hybrid_property(fget[, fset=None[, fdel=None[, expr=None]]])

Method decorator that allows the definition of a Python object property with both instance-level and class-level behavior.


class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    def length(self):
        return self.end - self.start

    def radius(self):
        return abs(self.length) / 2

    def radius(cls):
        return fn.ABS(cls.length) / 2

When accessed on an Interval instance, the length and radius properties will behave as you would expect. When accessed as class attributes, though, a SQL expression will be generated instead:

query = (Interval
             (Interval.length > 6) &
             (Interval.radius >= 3)))

Would generate the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
    (("t1"."end" - "t1"."start") > 6) AND
    ((abs("t1"."end" - "t1"."start") / 2) >= 3)

Key/Value Store

The playhouse.kv module contains the implementation of a persistent dictionary.

class KeyValue([key_field=None[, value_field=None[, ordered=False[, database=None[, table_name='keyvalue']]]]])
  • key_field (Field) – field to use for key. Defaults to CharField. Must have primary_key=True.
  • value_field (Field) – field to use for value. Defaults to PickleField.
  • ordered (bool) – data should be returned in key-sorted order.
  • database (Database) – database where key/value data is stored. If not specified, an in-memory SQLite database will be used.
  • table_name (str) – table name for data storage.

Dictionary-like API for storing key/value data. Like dictionaries, supports the expected APIs, but also has the added capability of accepting expressions for getting, setting and deleting items.

Table is created automatically (if it doesn’t exist) when the KeyValue is instantiated.

Uses efficient upsert implementation for setting and updating/overwriting key/value pairs.

Basic examples:

# Create a key/value store, which uses an in-memory SQLite database
# for data storage.
KV = KeyValue()

# Set (or overwrite) the value for "k1".
KV['k1'] = 'v1'

# Set (or update) multiple keys at once (uses an efficient upsert).
KV.update(k2='v2', k3='v3')

# Getting values works as you'd expect.
assert KV['k2'] == 'v2'

# We can also do this:
for value in KV[KV.key > 'k1']:

# 'v2'
# 'v3'

# Update multiple values at once using expression:
KV[KV.key > 'k1'] = 'vx'

# What's stored in the KV?

# {'k1': 'v1', 'k2': 'vx', 'k3': 'vx'}

# Delete a single item.
del KV['k2']

# How many items are stored in the KV?
# 2

# Delete items that match the given condition.
del KV[KV.key > 'k1']
Parameters:expr – a single key or an expression
Returns:Boolean whether key/expression exists.


>>> kv = KeyValue()
>>> kv.update(k1='v1', k2='v2')

>>> 'k1' in kv
>>> 'kx' in kv

>>> (KV.key < 'k2') in KV
>>> (KV.key > 'k2') in KV
Returns:Count of items stored.
Parameters:expr – a single key or an expression.
Returns:value(s) corresponding to key/expression.
Raises:KeyError if single key given and not found.


>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')

>>> KV['k1']
>>> KV['kx']
KeyError: "kx" not found

>>> KV[KV.key > 'k1']
['v2', 'v3']
>>> KV[KV.key < 'k1']
__setitem__(expr, value)
  • expr – a single key or an expression.
  • value – value to set for key(s)

Set value for the given key. If expr is an expression, then any keys matching the expression will have their value updated.


>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')

>>> KV['k1'] = 'v1-x'
>>> print(KV['k1'])

>>> KV[KV.key >= 'k2'] = 'v99'
>>> dict(KV)
{'k1': 'v1-x', 'k2': 'v99', 'k3': 'v99'}
Parameters:expr – a single key or an expression.

Delete the given key. If an expression is given, delete all keys that match the expression.


>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2, k3=3)

>>> del KV['k1']  # Deletes "k1".
>>> del KV['k1']
KeyError: "k1" does not exist

>>> del KV[KV.key > 'k2']  # Deletes "k3".
>>> del KV[KV.key > 'k99']  # Nothing deleted, no keys match.
Returns:an iterable of all keys in the table.
Returns:an iterable of all values in the table.
Returns:an iterable of all key/value pairs in the table.
update([__data=None[, **mapping]])

Efficiently bulk-insert or replace the given key/value pairs.


>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2)  # Sets 'k1'=1, 'k2'=2.

>>> dict(KV)
{'k1': 1, 'k2': 2}

>>> KV.update(k2=22, k3=3)  # Updates 'k2'->22, sets 'k3'=3.

>>> dict(KV)
{'k1': 1, 'k2': 22, 'k3': 3}

>>> KV.update({'k2': -2, 'k4': 4})  # Also can pass a dictionary.

>>> dict(KV)
{'k1': 1, 'k2': -2, 'k3': 3, 'k4': 4}
get(expr[, default=None])
  • expr – a single key or an expression.
  • default – default value if key not found.

value of given key/expr or default if single key not found.

Get the value at the given key. If the key does not exist, the default value is returned, unless the key is an expression in which case an empty list will be returned.

pop(expr[, default=Sentinel])
  • expr – a single key or an expression.
  • default – default value if key does not exist.

value of given key/expr or default if single key not found.

Get value and delete the given key. If the key does not exist, the default value is returned, unless the key is an expression in which case an empty list is returned.


Remove all items from the key-value table.


This module contains helper functions for expressing things that would otherwise be somewhat verbose or cumbersome using peewee’s APIs. There are also helpers for serializing models to dictionaries and vice-versa.

model_to_dict(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None[, max_depth=None[, manytomany=False]]]]]]]])
  • recurse (bool) – Whether foreign-keys should be recursed.
  • backrefs (bool) – Whether lists of related objects should be recursed.
  • only – A list (or set) of field instances which should be included in the result dictionary.
  • exclude – A list (or set) of field instances which should be excluded from the result dictionary.
  • extra_attrs – A list of attribute or method names on the instance which should be included in the dictionary.
  • fields_from_query (Select) – The SelectQuery that created this model instance. Only the fields and values explicitly selected by the query will be serialized.
  • max_depth (int) – Maximum depth when recursing.
  • manytomany (bool) – Process many-to-many fields.

Convert a model instance (and optionally any related instances) to a dictionary.


>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}

>>> model_to_dict(user, backrefs=True)
{'id': 1, 'tweets': [], 'username': 'charlie'}

>>> t1 = Tweet.create(user=user, message='tweet-1')
>>> t2 = Tweet.create(user=user, message='tweet-2')
>>> model_to_dict(user, backrefs=True)
  'id': 1,
  'tweets': [
    {'id': 1, 'message': 'tweet-1'},
    {'id': 2, 'message': 'tweet-2'},
  'username': 'charlie'

>>> model_to_dict(t1)
  'id': 1,
  'message': 'tweet-1',
  'user': {
    'id': 1,
    'username': 'charlie'

>>> model_to_dict(t2, recurse=False)
{'id': 1, 'message': 'tweet-2', 'user': 1}

The implementation of model_to_dict is fairly complex, owing to the various usages it attempts to support. If you have a special usage, I strongly advise that you do not attempt to shoe-horn some crazy combination of parameters into this function. Just write a simple function that accomplishes exactly what you’re attempting to do.

dict_to_model(model_class, data[, ignore_unknown=False])
  • model_class (Model) – The model class to construct.
  • data (dict) – A dictionary of data. Foreign keys can be included as nested dictionaries, and back-references as lists of dictionaries.
  • ignore_unknown (bool) – Whether to allow unrecognized (non-field) attributes.

Convert a dictionary of data to a model instance, creating related instances where appropriate.


>>> user_data = {'id': 1, 'username': 'charlie'}
>>> user = dict_to_model(User, user_data)
>>> user
<__main__.User at 0x7fea8fa4d490>

>>> user.username

>>> note_data = {'id': 2, 'text': 'note text', 'user': user_data}
>>> note = dict_to_model(Note, note_data)
>>> note.text
'note text'
>>> note.user.username

>>> user_with_notes = {
...     'id': 1,
...     'username': 'charlie',
...     'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]}
>>> user = dict_to_model(User, user_with_notes)
>>> user.notes[0].text
>>> user.notes[0].user.username
update_model_from_dict(instance, data[, ignore_unknown=False])
  • instance (Model) – The model instance to update.
  • data (dict) – A dictionary of data. Foreign keys can be included as nested dictionaries, and back-references as lists of dictionaries.
  • ignore_unknown (bool) – Whether to allow unrecognized (non-field) attributes.

Update a model instance with the given data dictionary.

Signal support

Models with hooks for signals (a-la django) are provided in playhouse.signals. To use the signals, you will need all of your project’s models to be a subclass of playhouse.signals.Model, which overrides the necessary methods to provide support for the various signals.

from playhouse.signals import Model, post_save

class MyModel(Model):
    data = IntegerField()

def on_save_handler(model_class, instance, created):


For what I hope are obvious reasons, Peewee signals do not work when you use the Model.insert(), Model.update(), or Model.delete() methods. These methods generate queries that execute beyond the scope of the ORM, and the ORM does not know about which model instances might or might not be affected when the query executes.

Signals work by hooking into the higher-level peewee APIs like and Model.delete_instance(), where the affected model instance is known ahead of time.

The following signals are provided:

Called immediately before an object is saved to the database. Provides an additional keyword argument created, indicating whether the model is being saved for the first time or updated.
Called immediately after an object is saved to the database. Provides an additional keyword argument created, indicating whether the model is being saved for the first time or updated.
Called immediately before an object is deleted from the database when Model.delete_instance() is used.
Called immediately after an object is deleted from the database when Model.delete_instance() is used.
Called when a model class is first instantiated

Connecting handlers

Whenever a signal is dispatched, it will call any handlers that have been registered. This allows totally separate code to respond to events like model save and delete.

The Signal class provides a connect() method, which takes a callback function and two optional parameters for “sender” and “name”. If specified, the “sender” parameter should be a single model class and allows your callback to only receive signals from that one model class. The “name” parameter is used as a convenient alias in the event you wish to unregister your signal handler.

Example usage:

from playhouse.signals import *

def post_save_handler(sender, instance, created):
    print '%s was just saved' % instance

# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)

All signal handlers accept as their first two arguments sender and instance, where sender is the model class and instance is the actual model being acted upon.

If you’d like, you can also use a decorator to connect signal handlers. This is functionally equivalent to the above example:

def post_save_handler(sender, instance, created):
    print '%s was just saved' % instance

Signal API

class Signal

Stores a list of receivers (callbacks) and calls them when the “send” method is invoked.

connect(receiver[, sender=None[, name=None]])
  • receiver (callable) – a callable that takes at least two parameters, a “sender”, which is the Model subclass that triggered the signal, and an “instance”, which is the actual model instance.
  • sender (Model) – if specified, only instances of this model class will trigger the receiver callback.
  • name (string) – a short alias

Add the receiver to the internal list of receivers, which will be called whenever the signal is sent.

from playhouse.signals import post_save
from project.handlers import cache_buster

post_save.connect(cache_buster, name='project.cache_buster')
disconnect([receiver=None[, name=None]])
  • receiver (callable) – the callback to disconnect
  • name (string) – a short alias

Disconnect the given receiver (or the receiver with the given name alias) so that it no longer is called. Either the receiver or the name must be provided.

send(instance, *args, **kwargs)
Parameters:instance – a model instance

Iterates over the receivers and will call them in the order in which they were connected. If the receiver specified a sender, it will only be called if the instance is an instance of the sender.

pwiz, a model generator

pwiz is a little script that ships with peewee and is capable of introspecting an existing database and generating model code suitable for interacting with the underlying data. If you have a database already, pwiz can give you a nice boost by generating skeleton code with correct column affinities and foreign keys.

If you install peewee using install, pwiz will be installed as a “script” and you can just run:

python -m pwiz -e postgresql -u postgres my_postgres_db

This will print a bunch of models to standard output. So you can do this:

python -m pwiz -e postgresql my_postgres_db >
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print [ for blog in]

Command-line options

pwiz accepts the following command-line options:

Option Meaning Example
-h show help  
-e database backend -e mysql
-H host to connect to -H remote.db.server
-p port to connect on -p 9001
-u database user -u postgres
-P database password -P (will be prompted for password)
-s schema -s public
-t tables to generate -t tweet,users,relationships
-v generate models for VIEWs (no argument)
-i add info metadata to generated file (no argument)
-o table column order is preserved (no argument)

The following are valid parameters for the engine (-e):

  • sqlite
  • mysql
  • postgresql


If a password is required to access your database, you will be prompted to enter it using a secure prompt.

The password will be included in the output. Specifically, at the top of the file a Database will be defined along with any required parameters – including the password.

pwiz examples

Examples of introspecting various databases:

# Introspect a Sqlite database.
python -m pwiz -e sqlite path/to/sqlite_database.db

# Introspect a MySQL database, logging in as root. You will be prompted
# for a password ("-P").
python -m pwiz -e mysql -u root -P mysql_db_name

# Introspect a Postgresql database on a remote server.
python -m pwiz -e postgres -u postgres -H pg_db_name

Full example:

$ sqlite3 example.db << EOM
CREATE TABLE "tweet" (
    "content" TEXT NOT NULL,
    "timestamp" DATETIME NOT NULL,
    "user_id" INTEGER NOT NULL,
    FOREIGN KEY ("user_id") REFERENCES "user" ("id"));
CREATE UNIQUE INDEX "user_username" ON "user" ("username");

$ python -m pwiz -e sqlite example.db

Produces the following output:

from peewee import *

database = SqliteDatabase('example.db', **{})

class UnknownField(object):
    def __init__(self, *_, **__): pass

class BaseModel(Model):
    class Meta:
        database = database

class User(BaseModel):
    username = TextField(unique=True)

    class Meta:
        table_name = 'user'

class Tweet(BaseModel):
    content = TextField()
    timestamp = DateTimeField()
    user = ForeignKeyField(column_name='user_id', field='id', model=User)

    class Meta:
        table_name = 'tweet'


  • The foreign-key Tweet.user_id is detected and mapped correctly.
  • The User.username UNIQUE constraint is detected.
  • Each model explicitly declares its table name, even in cases where it is not necessary (as Peewee would automatically translate the class name into the appropriate table name).
  • All the parameters of the ForeignKeyField are explicitly declared, even though they follow the conventions Peewee uses by default.


The UnknownField is a placeholder that is used in the event your schema contains a column declaration that Peewee doesn’t know how to map to a field class.

Schema Migrations

Peewee now supports schema migrations, with well-tested support for Postgresql, SQLite and MySQL. Unlike other schema migration tools, peewee’s migrations do not handle introspection and database “versioning”. Rather, peewee provides a number of helper functions for generating and running schema-altering statements. This engine provides the basis on which a more sophisticated tool could some day be built.

Migrations can be written as simple python scripts and executed from the command-line. Since the migrations only depend on your applications Database object, it should be easy to manage changing your model definitions and maintaining a set of migration scripts without introducing dependencies.

Example usage

Begin by importing the helpers from the migrate module:

from playhouse.migrate import *

Instantiate a migrator. The SchemaMigrator class is responsible for generating schema altering operations, which can then be run sequentially by the migrate() helper.

# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)

# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)

Use migrate() to execute one or more operations:

title_field = CharField(default='')
status_field = IntegerField(null=True)

    migrator.add_column('some_table', 'title', title_field),
    migrator.add_column('some_table', 'status', status_field),
    migrator.drop_column('some_table', 'old_column'),


Migrations are not run inside a transaction. If you wish the migration to run in a transaction you will need to wrap the call to migrate in a atomic() context-manager, e.g.

with my_db.atomic():

Supported Operations

Add new field(s) to an existing model:

# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')

# Run the migration, specifying the database table, field name and field.
    migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
    migrator.add_column('comment_tbl', 'comment', comment_field),

Renaming a field:

# Specify the table, original name of the column, and its new name.
    migrator.rename_column('story', 'pub_date', 'publish_date'),
    migrator.rename_column('story', 'mod_date', 'modified_date'),

Dropping a field:

    migrator.drop_column('story', 'some_old_field'),

Making a field nullable or not nullable:

# Note that when making a field not null that field must not have any
# NULL values present.
    # Make `pub_date` allow NULL values.
    migrator.drop_not_null('story', 'pub_date'),

    # Prevent `modified_date` from containing NULL values.
    migrator.add_not_null('story', 'modified_date'),

Altering a field’s data-type:

# Change a VARCHAR(50) field to a TEXT field.
    migrator.alter_column_type('person', 'email', TextField())

Renaming a table:

    migrator.rename_table('story', 'stories_tbl'),

Adding an index:

# Specify the table, column names, and whether the index should be
# UNIQUE or not.
    # Create an index on the `pub_date` column.
    migrator.add_index('story', ('pub_date',), False),

    # Create a multi-column index on the `pub_date` and `status` fields.
    migrator.add_index('story', ('pub_date', 'status'), False),

    # Create a unique index on the category and title fields.
    migrator.add_index('story', ('category_id', 'title'), True),

Dropping an index:

# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))

Adding or dropping table constraints:

# Add a CHECK() constraint to enforce the price cannot be negative.
    Check('price >= 0')))

# Remove the price check constraint.
migrate(migrator.drop_constraint('products', 'price_check'))

# Add a UNIQUE constraint on the first and last names.
migrate(migrator.add_unique('person', 'first_name', 'last_name'))


Postgres users may need to set the search-path when using a non-standard schema. This can be done as follows:

new_field = TextField(default='', null=False)
migrator = PostgresqlMigrator(db)
        migrator.add_column('table', 'field_name', new_field))

Migrations API


Execute one or more schema altering operations.


    migrator.add_column('some_table', 'new_column', CharField(default='')),
    migrator.create_index('some_table', ('new_column',)),
class SchemaMigrator(database)
Parameters:database – a Database instance.

The SchemaMigrator is responsible for generating schema-altering statements.

add_column(table, column_name, field)
  • table (str) – Name of the table to add column to.
  • column_name (str) – Name of the new column.
  • field (Field) – A Field instance.

Add a new column to the provided table. The field provided will be used to generate the appropriate column definition.


If the field is not nullable it must specify a default value.


For non-null fields, the field will initially be added as a null field, then an UPDATE statement will be executed to populate the column with the default value. Finally, the column will be marked as not null.

drop_column(table, column_name[, cascade=True])
  • table (str) – Name of the table to drop column from.
  • column_name (str) – Name of the column to drop.
  • cascade (bool) – Whether the column should be dropped with CASCADE.
rename_column(table, old_name, new_name)
  • table (str) – Name of the table containing column to rename.
  • old_name (str) – Current name of the column.
  • new_name (str) – New name for the column.
add_not_null(table, column)
  • table (str) – Name of table containing column.
  • column (str) – Name of the column to make not nullable.
drop_not_null(table, column)
  • table (str) – Name of table containing column.
  • column (str) – Name of the column to make nullable.
alter_column_type(table, column, field[, cast=None])
  • table (str) – Name of the table.
  • column_name (str) – Name of the column to modify.
  • field (Field) – Field instance representing new data type.
  • cast – (postgres-only) specify a cast expression if the data-types are incompatible, e.g. column_name::int. Can be provided as either a string or a Cast instance.

Alter the data-type of a column. This method should be used with care, as using incompatible types may not be well-supported by your database.

rename_table(old_name, new_name)
  • old_name (str) – Current name of the table.
  • new_name (str) – New name for the table.
add_index(table, columns[, unique=False[, using=None]])
  • table (str) – Name of table on which to create the index.
  • columns (list) – List of columns which should be indexed.
  • unique (bool) – Whether the new index should specify a unique constraint.
  • using (str) – Index type (where supported), e.g. GiST or GIN.
drop_index(table, index_name)
  • table (str) – Name of the table containing the index to be dropped.
  • index_name (str) – Name of the index to be dropped.
add_constraint(table, name, constraint)
  • table (str) – Table to add constraint to.
  • name (str) – Name used to identify the constraint.
  • constraint – either a Check() constraint or for adding an arbitrary constraint use SQL.
drop_constraint(table, name)
  • table (str) – Table to drop constraint from.
  • name (str) – Name of constraint to drop.
add_unique(table, *column_names)
  • table (str) – Table to add constraint to.
  • column_names (str) – One or more columns for UNIQUE constraint.
class PostgresqlMigrator(database)

Generate migrations for Postgresql databases.

Parameters:schema_name (str) – Schema to use.

Set the search path (schema) for the subsequent operations.

class SqliteMigrator(database)

Generate migrations for SQLite databases.

SQLite has limited support for ALTER TABLE queries, so the following operations are currently not supported for SQLite:

  • add_constraint
  • drop_constraint
  • add_unique
class MySQLMigrator(database)

Generate migrations for MySQL databases.


The reflection module contains helpers for introspecting existing databases. This module is used internally by several other modules in the playhouse, including DataSet and pwiz, a model generator.

generate_models(database[, schema=None[, **options]])

a dict mapping table names to model classes.

Generate models for the tables in the given database. For an example of how to use this function, see the section Using Peewee Interactively.


>>> from peewee import *
>>> from playhouse.reflection import generate_models
>>> db = PostgresqlDatabase('my_app')
>>> models = generate_models(db)
>>> list(models.keys())
['account', 'customer', 'order', 'orderitem', 'product']

>>> globals().update(models)  # Inject models into namespace.
>>> for cust in  # Query using generated model.
...     print(

Huey Kitty
Mickey Dog
Parameters:model (Model) – model class to print
Returns:no return value

Print a user-friendly description of a model class, useful for debugging or interactive use. Currently this prints the table name, and all fields along with their data-types. The Using Peewee Interactively section contains an example.

Example output:

>>> from playhouse.reflection import print_model
>>> print_model(User)
  id AUTO PK
  email TEXT
  name TEXT
  dob DATE

  email UNIQUE

>>> print_model(Tweet)
  id AUTO PK
  user INT FK:
  title TEXT
  content TEXT
  timestamp DATETIME
  is_published BOOL

  is_published, timestamp
Parameters:model (Model) – model to print
Returns:no return value

Prints the SQL CREATE TABLE for the given model class, which may be useful for debugging or interactive use. See the Using Peewee Interactively section for example usage. Note that indexes and constraints are not included in the output of this function.

Example output:

>>> from playhouse.reflection import print_table_sql
>>> print_table_sql(User)
  "email" TEXT NOT NULL,
  "name" TEXT NOT NULL,

>>> print_table_sql(Tweet)
  "user_id" INTEGER NOT NULL,
  "title" TEXT NOT NULL,
  "content" TEXT NOT NULL,
  "timestamp" DATETIME NOT NULL,
  "is_published" INTEGER NOT NULL,
  FOREIGN KEY ("user_id") REFERENCES "user" ("id")
class Introspector(metadata[, schema=None])

Metadata can be extracted from a database by instantiating an Introspector. Rather than instantiating this class directly, it is recommended to use the factory method from_database().

classmethod from_database(database[, schema=None])
  • database – a Database instance.
  • schema (str) – an optional schema (supported by some databases).

Creates an Introspector instance suitable for use with the given database.


db = SqliteDatabase('my_app.db')
introspector = Introspector.from_database(db)
models = introspector.generate_models()

# User and Tweet (assumed to exist in the database) are
# peewee Model classes generated from the database schema.
User = models['user']
Tweet = models['tweet']
generate_models([skip_invalid=False[, table_names=None[, literal_column_names=False[, bare_fields=False[, include_views=False]]]]])
  • skip_invalid (bool) – Skip tables whose names are invalid python identifiers.
  • table_names (list) – List of table names to generate. If unspecified, models are generated for all tables.
  • literal_column_names (bool) – Use column-names as-is. By default, column names are “python-ized”, i.e. mixed-case becomes lower-case.
  • bare_fieldsSQLite-only. Do not specify data-types for introspected columns.
  • include_views – generate models for VIEWs as well.

A dictionary mapping table-names to model classes.

Introspect the database, reading in the tables, columns, and foreign key constraints, then generate a dictionary mapping each database table to a dynamically-generated Model class.

Database URL

This module contains a helper function to generate a database connection from a URL connection string.

connect(url, **connect_params)

Create a Database instance from the given connection URL.


  • sqlite:///my_database.db will create a SqliteDatabase instance for the file my_database.db in the current directory.
  • sqlite:///:memory: will create an in-memory SqliteDatabase instance.
  • postgresql://postgres:my_password@localhost:5432/my_database will create a PostgresqlDatabase instance. A username and password are provided, as well as the host and port to connect to.
  • mysql://user:passwd@ip:port/my_db will create a MySQLDatabase instance for the local MySQL database my_db.
  • mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 will create a PooledMySQLDatabase instance for the local MySQL database my_db with max_connections set to 20 and a stale_timeout setting of 300 seconds.

Supported schemes:


import os
from playhouse.db_url import connect

# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')

Parse the information in the given URL into a dictionary containing database, host, port, user and/or password. Additional connection arguments can be passed in the URL query string.

If you are using a custom database class, you can use the parse() function to extract information from a URL which can then be passed in to your database object.

register_database(db_class, *names)
  • db_class – A subclass of Database.
  • names – A list of names to use as the scheme in the URL, e.g. ‘sqlite’ or ‘firebird’

Register additional database class under the specified names. This function can be used to extend the connect() function to support additional schemes. Suppose you have a custom database class for Firebird named FirebirdDatabase.

from playhouse.db_url import connect, register_database

register_database(FirebirdDatabase, 'firebird')
db = connect('firebird://my-firebird-db')

Connection pool

The pool module contains a number of Database classes that provide connection pooling for PostgreSQL, MySQL and SQLite databases. The pool works by overriding the methods on the Database class that open and close connections to the backend. The pool can specify a timeout after which connections are recycled, as well as an upper bound on the number of open connections.

In a multi-threaded application, up to max_connections will be opened. Each thread (or, if using gevent, greenlet) will have its own connection.

In a single-threaded application, only one connection will be created. It will be continually recycled until either it exceeds the stale timeout or is closed explicitly (using .manual_close()).

By default, all your application needs to do is ensure that connections are closed when you are finished with them, and they will be returned to the pool. For web applications, this typically means that at the beginning of a request, you will open a connection, and when you return a response, you will close the connection.

Simple Postgres pool example code:

# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase

db = PooledPostgresqlExtDatabase(
    stale_timeout=300,  # 5 minutes.

class BaseModel(Model):
    class Meta:
        database = db

That’s it! If you would like finer-grained control over the pool of connections, check out the advanced_connection_management section.

Pool APIs

class PooledDatabase(database[, max_connections=20[, stale_timeout=None[, timeout=None[, **kwargs]]]])
  • database (str) – The name of the database or database file.
  • max_connections (int) – Maximum number of connections. Provide None for unlimited.
  • stale_timeout (int) – Number of seconds to allow connections to be used.
  • timeout (int) – Number of seconds to block when pool is full. By default peewee does not block when the pool is full but simply throws an exception. To block indefinitely set this value to 0.
  • kwargs – Arbitrary keyword arguments passed to database class.

Mixin class intended to be used with a subclass of Database.


Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.


If the number of open connections exceeds max_connections, a ValueError will be raised.


Close the currently-open connection without returning it to the pool.


Close all idle connections. This does not include any connections that are currently in-use – only those that were previously created but have since been returned back to the pool.

Parameters:age (int) – Age at which a connection should be considered stale.
Returns:Number of connections closed.

Close connections which are in-use but exceed the given age. Use caution when calling this method!


Close all connections. This includes any connections that may be in use at the time. Use caution when calling this method!

class PooledPostgresqlDatabase

Subclass of PostgresqlDatabase that mixes in the PooledDatabase helper.

class PooledPostgresqlExtDatabase

Subclass of PostgresqlExtDatabase that mixes in the PooledDatabase helper. The PostgresqlExtDatabase is a part of the Postgresql Extensions module and provides support for many Postgres-specific features.

class PooledMySQLDatabase

Subclass of MySQLDatabase that mixes in the PooledDatabase helper.

class PooledSqliteDatabase

Persistent connections for SQLite apps.

class PooledSqliteExtDatabase

Persistent connections for SQLite apps, using the SQLite Extensions advanced database driver SqliteExtDatabase.

Test Utils

Contains utilities helpful when testing peewee projects.

class count_queries([only_select=False])

Context manager that will count the number of queries executed within the context.

Parameters:only_select (bool) – Only count SELECT queries.
with count_queries() as counter:
    huey = User.get(User.username == 'huey')
    huey_tweets = [tweet.message for tweet in huey.tweets]

assert counter.count == 2

The number of queries executed.


Return a list of 2-tuples consisting of the SQL query and a list of parameters.

assert_query_count(expected[, only_select=False])

Function or method decorator that will raise an AssertionError if the number of queries executed in the decorated function does not equal the expected number.

class TestMyApp(unittest.TestCase):
    def test_get_popular_blogs(self):
        popular_blogs = Blog.get_popular()
            [blog.title for blog in popular_blogs],
            ["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])

This function can also be used as a context manager:

class TestMyApp(unittest.TestCase):
    def test_expensive_operation(self):
        with assert_query_count(1):

Flask Utils

The playhouse.flask_utils module contains several helpers for integrating peewee with the Flask web framework.

Database Wrapper

The FlaskDB class is a wrapper for configuring and referencing a Peewee database from within a Flask application. Don’t let its name fool you: it is not the same thing as a peewee database. FlaskDB is designed to remove the following boilerplate from your flask app:

  • Dynamically create a Peewee database instance based on app config data.
  • Create a base class from which all your application’s models will descend.
  • Register hooks at the start and end of a request to handle opening and closing a database connection.

Basic usage:

import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB

DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'

app = Flask(__name__)

db_wrapper = FlaskDB(app)

class User(db_wrapper.Model):
    username = CharField(unique=True)

class Tweet(db_wrapper.Model):
    user = ForeignKeyField(User, backref='tweets')
    content = TextField()
    timestamp = DateTimeField(

The above code example will create and instantiate a peewee PostgresqlDatabase specified by the given database URL. Request hooks will be configured to establish a connection when a request is received, and automatically close the connection when the response is sent. Lastly, the FlaskDB class exposes a FlaskDB.Model property which can be used as a base for your application’s models.

Here is how you can access the wrapped Peewee database instance that is configured for you by the FlaskDB wrapper:

# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database

@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
    with peewee_db.atomic():
        # ...

    return jsonify({'transfer-id': xid})


The actual peewee database can be accessed using the FlaskDB.database attribute.

Here is another way to configure a Peewee database using FlaskDB:

app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')

While the above examples show using a database URL, for more advanced usages you can specify a dictionary of configuration options, or simply pass in a peewee Database instance:

    'name': 'my_app_db',
    'engine': 'playhouse.pool.PooledPostgresqlDatabase',
    'user': 'postgres',
    'max_connections': 32,
    'stale_timeout': 600,

app = Flask(__name__)

wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database

Using a peewee Database object:

peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)

Database with Application Factory

If you prefer to use the application factory pattern, the FlaskDB class implements an init_app() method.

Using as a factory:

db_wrapper = FlaskDB()

# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
    username = CharField(unique=True)

def create_app():
    app = Flask(__name__)
    app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
    return app

Query utilities

The flask_utils module provides several helpers for managing queries in your web app. Some common patterns include:

get_object_or_404(query_or_model, *query)
  • query_or_model – Either a Model class or a pre-filtered SelectQuery.
  • query – An arbitrarily complex peewee expression.

Retrieve the object matching the given query, or return a 404 not found response. A common use-case might be a detail page for a weblog. You want to either retrieve the post matching the given URL, or return a 404.


def post_detail(slug):
    public_posts = == True)
    post = get_object_or_404(public_posts, (Post.slug == slug))
    return render_template('post_detail.html', post=post)
object_list(template_name, query[, context_variable='object_list'[, paginate_by=20[, page_var='page'[, check_bounds=True[, **kwargs]]]]])
  • template_name – The name of the template to render.
  • query – A SelectQuery instance to paginate.
  • context_variable – The context variable name to use for the paginated object list.
  • paginate_by – Number of objects per-page.
  • page_var – The name of the GET argument which contains the page.
  • check_bounds – Whether to check that the given page is a valid page. If check_bounds is True and an invalid page is specified, then a 404 will be returned.
  • kwargs – Arbitrary key/value pairs to pass into the template context.

Retrieve a paginated list of objects specified by the given query. The paginated object list will be dropped into the context using the given context_variable, as well as metadata about the current page and total number of pages, and finally any arbitrary context data passed as keyword-arguments.

The page is specified using the page GET argument, e.g. /my-object-list/?page=3 would return the third page of objects.


def post_index():
    public_posts = (Post
                    .where(Post.published == True)

    return object_list(

The template will have the following context:

  • post_list, which contains a list of up to 10 posts.
  • page, which contains the current page based on the value of the page GET parameter.
  • pagination, a PaginatedQuery instance.
class PaginatedQuery(query_or_model, paginate_by[, page_var='page'[, check_bounds=False]])
  • query_or_model – Either a Model or a SelectQuery instance containing the collection of records you wish to paginate.
  • paginate_by – Number of objects per-page.
  • page_var – The name of the GET argument which contains the page.
  • check_bounds – Whether to check that the given page is a valid page. If check_bounds is True and an invalid page is specified, then a 404 will be returned.

Helper class to perform pagination based on GET arguments.


Return the currently selected page, as indicated by the value of the page_var GET parameter. If no page is explicitly selected, then this method will return 1, indicating the first page.


Return the total number of possible pages.


Using the value of get_page(), return the page of objects requested by the user. The return value is a SelectQuery with the appropriate LIMIT and OFFSET clauses.

If check_bounds was set to True and the requested page contains no objects, then a 404 will be raised.