简单例子:异步发送邮件
我要举的第一个示例是应用程序非常普通的需求:能够发送邮件但是不阻塞主应用。
在这个例子中我会用到 Flask-Mail 扩展,我会假设你们熟悉这个扩展。
我用来说明的示例应用是一个只有一个输入文本框的简单表单。要求用户在此文本框中输入一个电子邮件地址,并在提交,服务器会发送一个测试电子邮件到这个邮件地址。表单中包含两个提交按钮,一个立即发送邮件,一个是一分钟后发送邮件。表单的截图在文章开始。
这里就是支持这个示例的 HTML 模板:
<html>
<head>
<title>Flask + Celery Examples</title>
</head>
<body>
<h1>Flask + Celery Examples</h1>
<h2>Example 1: Send Asynchronous Email</h2>
{% for message in get_flashed_messages() %}
<p style="color: red;">{{ message }}</p>
{% endfor %}
<form method="POST">
<p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
<input type="submit" name="submit" value="Send">
<input type="submit" name="submit" value="Send in 1 minute">
</form>
</body>
</html>
这里没有什么特别的东西。只是一个普通的 HTML 表单,再加上 Flask 闪现消息。
Flask-Mail 扩展需要一些配置,尤其是电子邮件服务器发送邮件的时候会用到一些细节。为了简单我使用我的 Gmail 账号作为邮件服务器:
# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = '[email protected]'
注意为了避免我的账号丢失的风险,我将其设置在系统的环境变量,这是我从应用中导入的。
有一个单一的路由来支持这个示例:
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html', email=session.get('email', ''))
email = request.form['email']
session['email'] = email
# send the email
msg = Message('Hello from Flask',
recipients=[request.form['email']])
msg.body = 'This is a test email sent from a background Celery task.'
if request.form['submit'] == 'Send':
# send right away
send_async_email.delay(msg)
flash('Sending email to {0}'.format(email))
else:
# send in one minute
send_async_email.apply_async(args=[msg], countdown=60)
flash('An email will be sent to {0} in one minute'.format(email))
return redirect(url_for('index'))
再次说明,这是一个很标准的 Flask 应用。由于这是一个非常简单的表单,我决定在没有扩展的帮助下处理它,因此我用 request.method 和 request.form 来完成所有的管理。我保存用户在文本框中输入的值在 session 中,这样在页面重新加载后就能记住它。
在这个函数中让人有兴趣的是发送邮件的时候是通过调用一个叫做 send_async_email 的 Celery 任务,该任务调用 delay() 或者 apply_async() 方法。
这个应用的最后一部分就是能够完成作业的异步任务:
@celery.task
def send_async_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
这个任务使用 celery.task 装饰使得成为一个后台作业。这个函数唯一值得注意的就是 Flask-Mail 需要在应用的上下文中运行,因此需要在调用 send() 之前创建一个应用上下文。
重点注意在这个示例中从异步调用返回值并不保留,因此应用不能知道调用成功或者失败。当你运行这个示例的时候,需要检查 Celery worker 的输出来排查发送邮件的问题。