示例:带有身份验证功能的计数信号量
本书前面介绍了如何使用锁去获得一项资源的独占使用权,并给出了几个不同的锁实现。但是除了独占一项资源之外,有时候我们也会想要让多个用户共享一项资源,只要共享者的数量不超过我们限制的数量即可。
举个例子,假设我们的系统有一项需要大量计算的操作,如果很多用户同时执行这项操作的话,那么系统的计算资源将会被耗尽。为了保证系统的正常运作,我们可以使用计数信号量来限制在同一时间内能够执行该操作的最大用户数量。
计数信号量(counter semaphore)跟锁非常相似,它们都可以限制资源的使用权,但是跟锁只允许单个客户端使用资源的做法不同,计数信号量允许多个客户端同时使用资源,只要这些客户端的数量不超过指定的限制即可。
代码清单 13-7 展示了一个带有身份验证功能的计数信号量实现:
这个程序会把所有成功取得信号量的客户端的标识符储存在格式为
semaphore::<name>::holders
的集合键里面,至于信号量的最大可获取数量则储存在格式为semaphore::<name>::max_size
的字符串键里面。在使用计数信号量之前,用户需要先通过
set_max_size()
方法设置计数信号量的最大可获取数量。get_max_size()
方法和get_current_size()
方法可以分别获取计数信号量的最大可获取数量以及当前已获取数量。获取信号量的
acquire()
方法是程序的核心:在获取信号量之前,程序会先使用两个GET
命令分别获取信号量的当前已获取数量以及最大可获取数量,如果信号量的当前已获取数量并未超过最大可获取数量,那么程序将执行SADD
命令,将客户端给定的标识符添加到holders
集合里面。由于
GET
命令执行之后直到SADD
命令执行之前的这段时间里,可能会有其他客户端抢先取得了信号量,并导致可用信号量数量发生变化。因此程序需要使用WATCH
命令监视holders
键,并使用事务包裹SADD
命令,以此通过乐观锁机制确保信号量获取操作的安全性。因为
max_size
键的值也会影响信号量获取操作的执行结果,并且这个键的值在SADD
命令执行之前也可能会被其他客户端修改,所以程序在监视holders
键的同时,也需要监视max_size
键。当客户端想要释放自己持有的信号量时,它只需要把自己的标识符传给
release()
方法即可:release()
方法将调用SREM
命令,从holders
集合中查找并移除客户端给定的标识符。
代码清单 13-7 计数信号量实现:/pipeline-and-transaction/semaphore.py
- from redis import WatchError
- class Semaphore:
- def __init__(self, client, name):
- self.client = client
- self.name = name
- # 用于储存信号量持有者标识符的集合
- self.holder_key = "semaphore::{0}::holders".format(name)
- # 用于记录信号量最大可获取数量的字符串
- self.size_key = "semaphore::{0}::max_size".format(name)
- def set_max_size(self, size):
- """
- 设置信号量的最大可获取数量。
- """
- self.client.set(self.size_key, size)
- def get_max_size(self):
- """
- 返回信号量的最大可获取数量。
- """
- result = self.client.get(self.size_key)
- if result is None:
- return 0
- else:
- return int(result)
- def get_current_size(self):
- """
- 返回目前已被获取的信号量数量。
- """
- return self.client.scard(self.holder_key)
- def acquire(self, identity):
- """
- 尝试获取一个信号量,成功时返回 True ,失败时返回 False 。
- 传入的 identity 参数将被用于标识客户端的身份。
- 如果调用该方法时信号量的最大可获取数量尚未被设置,那么引发一个 TypeError 。
- """
- # 开启流水线
- pipe = self.client.pipeline()
- try:
- # 监视与信号量有关的两个键
- pipe.watch(self.size_key, self.holder_key)
- # 取得当前已被获取的信号量数量,以及最大可获取的信号量数量
- current_size = pipe.scard(self.holder_key)
- max_size_in_str = pipe.get(self.size_key)
- if max_size_in_str is None:
- raise TypeError("Semaphore max size not set")
- else:
- max_size = int(max_size_in_str)
- if current_size < max_size:
- # 如果还有剩余的信号量可用
- # 那么将给定的标识符放入到持有者集合中
- pipe.multi()
- pipe.sadd(self.holder_key, identity)
- pipe.execute()
- return True
- else:
- # 没有信号量可用,获取失败
- return False
- except WatchError:
- # 获取过程中有其他客户端修改了 size_key 或者 holder_key ,获取失败
- return False
- finally:
- # 取消监视
- pipe.unwatch()
- # 将连接归还给连接池
- pipe.reset()
- def release(self, identity):
- """
- 根据给定的标识符,尝试释放当前客户端持有的信号量。
- 返回 True 表示释放成功,返回 False 表示由于标识符不匹配而导致释放失败。
- """
- # 尝试从持有者集合中移除给定的标识符
- result = self.client.srem(self.holder_key, identity)
- # 移除成功则说明信号量释放成功
- return result == 1
以下代码简单地展示了这个计数信号量的使用方法:
- >>> from redis import Redis
- >>> from semaphore import Semaphore
- >>> client = Redis(decode_responses=True)
- >>> semaphore = Semaphore(client, "test-semaphore") # 创建计数信号量
- >>> semaphore.set_max_size(3) # 设置信号量的最大可获取数量
- >>> semaphore.acquire("peter") # 获取信号量
- True
- >>> semaphore.acquire("jack")
- True
- >>> semaphore.acquire("tom")
- True
- >>> semaphore.acquire("mary") # 可用的三个信号量都已被获取,无法取得更多信号量
- False
- >>> semaphore.release("jack") # 释放一个信号量
- True
- >>> semaphore.get_current_size() # 目前有两个信号量已被获取
- 2
- >>> semaphore.get_max_size() # 信号量的最大可获取数量为三个
- 3