使用 SQLite 3

在 Flask 中可以方便地按需打开数据库连接,并在情境结束时(通常是请求结束时)关闭。

下面是一个如何在 Flask 中使用 SQLite 3 的例子:

  1. import sqlite3
  2. from flask import g
  3. DATABASE = '/path/to/database.db'
  4. def get_db():
  5. db = getattr(g, '_database', None)
  6. if db is None:
  7. db = g._database = sqlite3.connect(DATABASE)
  8. return db
  9. @app.teardown_appcontext
  10. def close_connection(exception):
  11. db = getattr(g, '_database', None)
  12. if db is not None:
  13. db.close()

现在,要使用数据库,应用必须要么有一个活动的应用情境(在存在请求的情况下,总会有一个),要么创建一个应用情境。在这种情况下, get_db 函数可以用于获得当前数据库连接。一旦情境灭失,数据库连接就会中断。

注意:如果使用 Flask 0.9 版或者更早版本,需要使用flask._app_ctx_stack.top 代替 g ,因为 flask.g对象绑定到请求而不是应用情境。

示例:

  1. @app.route('/')
  2. def index():
  3. cur = get_db().cursor()
  4. ...

Note

请牢记,拆卸请求( teardown request )和应用情境( appcontext )函数总是会执行,即使一个请求前处理器( before-request handler )失败或者没有执行也是如此。因此,我们在关闭数据库前应当确认数据库已经存在。

按需连接

在第一次使用时连接的好处是只会在真正需要的时候打开连接。如果需要在一个请求情境之外使用这个代码,可以在 Python shell 中手动打开应用情境后使用:

  1. with app.app_context():
  2. # now you can use get_db()

简化查询

现在每个请求处理函数中可以通过 get_db() 来得到当前打开的数据库连接。一个行工厂( row factory )可以简化 SQLite 的使用,它会在每个结果返回的时候对返回结果进行加工。例如,为了得到字典型而不是元组型的结果,以下内容可以插入到前文的 get_db 函数中:

  1. def make_dicts(cursor, row):
  2. return dict((cursor.description[idx][0], value)
  3. for idx, value in enumerate(row))
  4. db.row_factory = make_dicts

这样, sqlite3 模块就会返回方便处理的字典类型的结果了。更进一步,我们可以把以下内容放到 get_db 中:

  1. db.row_factory = sqlite3.Row

这样查询会返回 Row 对象,而不是字典。 Row 对象是namedtuple ,因此既可以通过索引访问也以通过键访问。例如,假设我们有一个sqlite3.Row 名为 r ,记录包含 idFirstNameLastNameMiddleInitial 字段:

  1. >>> # 基于键的名称取值
  2. >>> r['FirstName']
  3. John
  4. >>> # 或者基于索引取值
  5. >>> r[1]
  6. John
  7. # Row 对象是可迭代的:
  8. >>> for value in r:
  9. ... print(value)
  10. 1
  11. John
  12. Doe
  13. M

另外,提供一个函数,用于获得游标、执行查询和获取结果是一个好主意:

  1. def query_db(query, args=(), one=False):
  2. cur = get_db().execute(query, args)
  3. rv = cur.fetchall()
  4. cur.close()
  5. return (rv[0] if rv else None) if one else rv

这个方便称手的小函数与行工厂联合使用比使用原始的数据库游标和连接对象要方便多了。

使用该函数示例:

  1. for user in query_db('select * from users'):
  2. print user['username'], 'has the id', user['user_id']

只需要得到单一结果的用法:

  1. user = query_db('select * from users where username = ?',
  2. [the_username], one=True)
  3. if user is None:
  4. print 'No such user'
  5. else:
  6. print the_username, 'has the id', user['user_id']

如果要给 SQL 语句传递参数,请在语句中使用问号来代替参数,并把参数放在一个列表中一起传递。不要用字符串格式化的方式直接把参数加入 SQL 语句中,这样会给应用带来 SQL 注入 的风险。

初始化模式

关系数据库是需要模式的,因此一个应用常常需要一个 schema.sql 文件来创建数据库。因此我们需要使用一个函数,用来基于模式创建数据库。下面这个函数可以完成这个任务:

  1. def init_db():
  2. with app.app_context():
  3. db = get_db()
  4. with app.open_resource('schema.sql', mode='r') as f:
  5. db.cursor().executescript(f.read())
  6. db.commit()

接下来可以在 Python shell 中创建数据库:

  1. >>> from yourapplication import init_db
  2. >>> init_db()