示例:带有阻塞功能的消息队列
在构建应用程序的时候,我们有时候会遇到一些非常耗时的操作,比如发送邮件、将一条新微博同步给上百万个用户、对硬盘进行大量读写、执行庞大的计算等等。因为这些操作是如此耗时,所以如果我们直接在响应用户请求的过程中执行它们的话,那么用户就需要等待非常长时间。
比如说,为了验证用户身份的有效性,有些网站在注册新用户的时候,会向用户给定的邮件地址发送一封激活邮件,用户只有在点击了验证邮件里面的激活链接之后,新注册的帐号才能够正常使用。
下面这段伪代码展示了一个带有邮件验证功能的帐号注册函数,这个函数不仅会为用户输入的用户名和密码创建新帐号,还会向用户给定的邮件地址发送一封激活:
- def register(username, password, email):
- # 创建新帐号
- create_new_account(username, password)
- # 发送激活邮件
- send_validate_email(email)
- # 向用户返回注册结果
- ui_print("帐号注册成功,请访问你的邮箱并激活帐号。")
因为邮件发送操作需要进行复杂的网络信息交换,所以它并不是一个快速的操作,如果我们直接在 send_valid_email()
函数里面执行邮件发送操作的话,那么用户可能就需要等待一段较长的时间才能看到 ui_print()
函数打印出的反馈信息。
为了解决这个问题,在执行 send_validate_email()
函数的时候,我们可以不立即执行邮件发送操作,而是将邮件发送任务放入到一个队列里面,然后由后台的线程负责实际执行。这样的话,程序只需要执行一个入队操作,然后就可以直接向用户反馈注册结果了,这比实际地发送邮件之后再向用户反馈结果要快得多。
代码清单 4-4 展示了一个使用 Redis 实现的消息队列,它使用 RPUSH
命令将消息推入队列,并使用 BLPOP
命令从队列里面取出待处理的消息。
代码清单 4-4 使用列表实现的消息队列:/list/message_queue.py
- class MessageQueue:
- def __init__(self, client, queue_name):
- self.client = client
- self.queue_name = queue_name
- def add_message(self, message):
- """
- 将一条消息放入到队列里面。
- """
- self.client.rpush(self.queue_name, message)
- def get_message(self, timeout=0):
- """
- 从队列里面获取一条消息,
- 如果暂时没有消息可用,那么就在 timeout 参数指定的时限内阻塞并等待可用消息出现。
- timeout 参数的默认值为 0 ,表示一直等待直到消息出现为止。
- """
- # blpop 的结果可以是 None ,也可以是一个包含两个元素的元组
- # 元组的第一个元素是弹出元素的来源队列,而第二个元素则是被弹出的元素
- result = self.client.blpop(self.queue_name, timeout)
- if result is not None:
- source_queue, poped_item = result
- return poped_item
- def len(self):
- """
- 返回队列目前包含的消息数量。
- """
- return self.client.llen(self.queue_name)
为了使用这个消息队列,我们通常需要用到两个客户端:
一个客户端作为消息的发送者(sender),它需要将待处理的消息推入到队列里面;
而另一个客户端则作为消息的接收者(receiver)和消费者(consumer),它负责从队列里面取出消息,并根据消息内容进行相应的处理工作。
下面的这段代码展示了一个简单的消息接收者,在没有消息的时候,这个程序将阻塞在 mq.get_message()
调用上面;当有消息(邮件地址)出现时,程序就会打印出该消息并发送邮件:
- >>> from redis import Redis
- >>> from message_queue import MessageQueue
- >>> client = Redis(decode_responses=True)
- >>> mq = MessageQueue(client, 'validate user email queue')
- >>> while True:
- ... email_address = mq.get_message() # 阻塞直到消息出现
- ... send_email(email_address) # 打印出邮件地址并发送邮件
- ...
- peter@exampl.com
- jack@spam.com
- tom@blahblah.com
而以下代码则展示了消息发送者是如何将消息推入到队列里面的:
- >>> from redis import Redis
- >>> from message_queue import MessageQueue
- >>> client = Redis(decode_responses=True)
- >>> mq = MessageQueue(client, 'validate user email queue')
- >>> mq.add_message("peter@exampl.com")
- >>> mq.add_message("jack@spam.com")
- >>> mq.add_message("tom@blahblah.com")
阻塞弹出操作的应用
本节展示的消息队列之所以使用 BLPOP
命令而不是 LPOP
命令来实现出队操作,是因为阻塞弹出操作可以让消息接收者在队列为空的时候自动阻塞,而不必手动进行休眠,从而使得消息处理程序的编写变得更为简单直接,并且还可以有效地节约系统资源。
作为对比,以下代码展示了在使用 LPOP
命令实现出队操作的情况下,如何实现类似上面展示的消息处理程序:
- while True:
- # 尝试获取消息,如果没有消息,那么返回 None
- email_address = mq.get_message()
- if email_address is not None:
- # 有消息,发送邮件
- send_email(email_address)
- else:
- # 没有消息可用,休眠一百毫秒之后再试
- sleep(0.1)
因为缺少自动的阻塞操作,所以这个程序在没有取得消息的情况下,只能以一百毫秒一次的频率去尝试获取消息,如果队列为空的时间比较长,那么这个程序就会发送很多多余的 LPOP
命令,并因此浪费很多 CPU 资源和网络资源。
使用消息队列实现实时提醒
消息队列除了可以在应用程序的内部中使用,还可以用于实现面向用户的实时提醒系统。
比如说,如果我们在构建一个社交网站的话,那么可以使用 JavaScript 脚本,让客户端以异步的方式调用 MessageQueue
类的 get_message()
方法,然后程序就可以在用户被关注的时候、收到了新回复的时候又或者收到新私信的时候,通过调用 add_message()
方法来向用户发送提醒信息。