Redis In Action 笔记(六):使用 Redis 作为应用程序组件
自动补全
程序说明
实现给一个组别中的成员发邮件时,输入前缀能找出匹配的收件人;程序假设所有名字是由字母组成。
设计思路
比如,查找abc前缀的名称:
- → 查找范围是(abbz, abd)
- → zrange取出该范围
- → 但问题是这两个元素的可能不存在
- → 向有序集合插入两个特殊元素,一个在abbz之后,一个在abd之前
- → 根据这两个特殊元素取排名→根据排名取范围
- → 已知:ASCII编码里面,z后面的第一个字符是 '{', a前面的第一个是反引号 ' ` '
- →得到 abd 之前的且所有以 abc 为前缀的合法名字之后的元素「 abc{ 」 →作为查找的结束元素
- →得到 abbz 之后的第一个元素是 abb{,这个元素位置 abc 之前→用作起始元素
- →特殊情况:如果是查找的前缀是aba,则起始元素是ab`
- → 由此得到查找的范围,即开始元素是前缀的最后一个字符后退一个ASCII字符 + ' { '(如果是a则变为' ` '), 结束元素是前缀直接加' { '
代码实现
import uuid
import bisect
import redis
conn = redis.Redis(host='127.0.0.1', port=6379)
valid_characters = '`abcdefghijklmnopqrstuvwxyz{'
# 获得起始和结束范围
def find_prefix_range(prefix):
posn = bisect.bisect_left(valid_characters, prefix[-1:])
suffix = valid_characters[(posn or 1) - 1]
return prefix[:-1] + suffix + '{', prefix + '{'
# 自动补全程序
def autocomplete_on_prefix(conn, guild, prefix):
# 获取起始和结束字符
start, end = find_prefix_range(prefix)
identifier = str(uuid.uuid4())
# 将开始和结束加上一个唯一标识符,避免其他用户同时操作时相互干扰
start += identifier
end += identifier
# 要查找的组
zset_name = 'members:' + guild
# 向有序集合中插入起始和结束字符
conn.zadd(zset_name, {start:0, end:0})
pipeline = conn.pipeline(True)
while 1:
try:
pipeline.watch(zset_name)
# 获得起始和结束元素的位置
sindex = pipeline.zrank(zset_name, start)
eindex = pipeline.zrank(zset_name, end)
# 这里用于让取出的元素不超过10个
erange = min(sindex + 9, eindex - 2)
pipeline.multi()
# 移除插入的两个元素
pipeline.zrem(zset_name, start, end)
# 获得搜索结果
pipeline.zrange(zset_name, sindex, erange)
items = pipeline.execute()[-1]
break
except redis.exceptions.WatchError:
continue
# 移除可能由其他客户端插入的用于搜索的元素(含有{)
return [item.decode('utf-8') for item in items if b'{' not in item]
# 加入小组
def join_guild(conn, guild, user):
conn.zadd('members:' + guild, {user:0})
# 退出小组
def leave_guild(conn, guild, user):
conn.zrem('members:' + guild, user)
# 测试运行
join_guild(conn, 'aa', 'abc')
join_guild(conn, 'aa', 'abcd')
join_guild(conn, 'aa', 'abcz')
join_guild(conn, 'aa', 'abcx')
join_guild(conn, 'aa', 'abcy')
join_guild(conn, 'aa', 'abci')
join_guild(conn, 'aa', 'abcj')
join_guild(conn, 'aa', 'abd')
items = autocomplete_on_prefix(conn, 'aa', 'abc')
print(items)
# 输出: ['abc', 'abcd', 'abci', 'abcj', 'abcx', 'abcy', 'abcz']
有序集合常规用途
- 快速判断某个元素是否存在于集合里面(ZSCORE key member,不存在这返回nil)
- 查看某个成员在有序集合中的位置或索引(ZRANK key-name member)
- 取出某个范围的元素(ZRANGE key-name start offset [WITHSCORES])
这里的用法
所有score设为0,当所有成员分值都相同时,按照成员的名字来进行排序; 而当所有成员分值都是0的时候,成员按照字符串的二进制进行排序。
分布式锁
实现一个分布式锁,核心的命令是:setnx(SET if Not eXists),它的作用是当设置一个KEY时,如果该KEY不存在,才能设置成功。那么当一个客户端设置一个KEY成功,并将其值设置为一个唯一标识符,其他客户端使用同样的命令则会设置失败,也就是说,其他客户端在该KEY存在的周期内,是无法成功设置该KEY的值的(程序上返回false),这相当于设置该KEY的值成功的客户端获得了锁,当该客户端使用完毕并将其删除,其他客户端才能够设置该KEY。
当系统负载较大的时候,WATCH锁导致程序进行重试的次数剧增,分布式锁无此问题。
程序实现
# 获取锁
def acquire_lock(conn, lockname, acquire_timeout=10):
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
# 在超时时间内不停重试,直到设置KEY的值成功,也即获得了「锁」
while time.time() < end:
if conn.setnx('lock:' + lockname, identifier):
# 加锁成功返回标识符
return identifier
time.sleep(.001)
return False
# 4.4.3节中的函数改写
def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
buyer = "users:%s" % buyerid
seller = "users:%s" % sellerid
item = "%s.%s" % (itemid, sellerid)
inventory = "inventory:%s" % buyerid
# 加锁(这里对整个市场加锁)
# 也可以改变锁的粒度,只对操作到的商品加锁
locked = acquire_lock(conn, 'market:')
if not locked:
return False
pipe = conn.pipeline(True)
try:
pipe.zscore("market:", item)
pipe.hget(buyer, 'funds')
price, funds = pipe.execute()
if price is None or price > funds:
return None
pipe.hincrby(seller, 'funds', int(price))
pipe.hincrby(buyer, 'funds', int(-price))
pipe.sadd(inventory, itemid)
pipe.zrem("market:", item)
pipe.execute()
return True
finally:
# 释放锁
release_lock(conn, 'market:', locked)
# 释放锁
def release_lock(conn, lockname, identifier):
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
pipe.watch(lockname)
if pipe.get(lockname) == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
带超时的锁
可以取得锁之后,给锁加一个超时时间,可以避免因为不可预料的情况,持有者一直持有锁。
acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
identifier = str(uuid.uuid4())
lockname = 'lock:' + lockname
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
if conn.setnx(lockname, identifier):
# 给锁设置超时时间
conn.expire(lockname, lock_timeout)
return identifier
# 如果锁已存在,顺便设置下超时时间
elif conn.ttl(lockname) < 0:
conn.expire(lockname, lock_timeout)
time.sleep(.001)
return False
计数信号量
-
计数信号量是一种锁,用于限制访问同一资源的进程数
-
可以把上一节创建的锁看成只能被一个进程访问的计数信号量
-
同样有获取锁、释放锁的操作,但获取锁失败时,倾向于立即返回失败结果,而不是重试
-
实现超时限制特性:1.使用EXPIRE 2.使用有序集合
例子
存储信号量信息的有序集合:
-
占坑
def acquire_semaphore(conn, semname, limit, timeout=10): identifier = str(uuid.uuid4()) now = time.time() pipeline = conn.pipeline(True) # 清理超时的坑位 pipeline.zremrangebyscore(semname, '-inf', now - timeout) pipeline.zadd(semname, {identifier::now}) # 占个坑先 pipeline.zrank(semname, identifier) # 获取位置 if pipeline.execute()[-1] < limit: # 如果坑位还足够,就抢占成功 return identifier # 坑位不足,占不到坑,就只能灰溜溜地走了 conn.zrem(semname, identifier) return None
-
释放坑位
def release_semaphore(conn, semname, identifier): return conn.zrem(semname, identifier)
公平信号量
解决系统时间可能不一致,导致系统时间较慢的客户端能抢先占有信号量的问题。
实现思路:新增一个计数器,再新增一个存储计数信号量信息的有序集合(A),这个有序集合以最新的计数器的值为score,还需要另一个存储计数信号量的有序集(B),score值为时间戳,用来清理超时的信号量。B清理调超时信号量后,与A求交集(zinterstore),B的聚合权重设为0,最终结果保存到A,这样一来,A通过B间接地清理掉超时信号量。实现代码如下: -
取得计数信号量
def acquire_fair_semaphore(conn, semname, limit, timeout=10): identifier = str(uuid.uuid4()) czset = semname + ':owner' # 信号量集合(B) ctr = semname + ':counter' # 计数器 now = time.time() pipeline = conn.pipeline(True) # 删除(A)中超时的,然后与czset(B)求交集来间接删除czset中超时的 pipeline.zremrangebyscore(semname, '-inf', now - timeout) # 求交集,semname聚合权重为0,这样不影响(A)的score值 pipeline.zinterstore(czset, {czset: 1, semname: 0}) pipeline.incr(ctr) counter = pipeline.execute()[-1] pipeline.zadd(semname, identifier, now) # 以时间戳为score的zset pipeline.zadd(czset, identifier, counter) # 以计数为score的zset pipeline.zrank(czset, identifier) # 获取位置 if pipeline.execute()[-1] < limit: return identifier pipeline.zrem(semname, identifier) pipeline.zrem(czset, identifier) pipeline.execute() return None
-
释放锁
比非公平信号量多了一个集合要删除。
def release_fair_semaphore(conn, semname, identifier): pipeline = conn.pipeline(True) pipeline.zrem(semname, identifier) pipeline.zrem(semname + ':owner', identifier) return pipeline.execute()[0]
-
刷新信号量
前面设置的计数信号量10s就超时,需要进行刷新,防止过期
# 刷新信号量 def refresh_fair_semaphore(conn, semname, identifier): # zadd操作,元素不存在时执行添加操作,返回1;存在时则更新元素,返回0 if conn.zadd(semname, identifier, time.time()): # 如果是添加,将判断语句添加的删除,返回False,表示已过期 release_fair_semaphore(conn, semname, identifier) return False # 如果是更新,直接跳到这里,返回True,表示刷新成功 return True
-
获取信号量之加锁版
用于消除竞争条件
def acquire_semaphore_with_lock(conn, semname, limit, timeout=10): # 设置一个超短时间过期的锁 identifier = acquire_lock(conn, semname, acquire_timeout=.01) # 如果设置成功 if identifier: try: # 获取公平信号量 return acquire_fair_semaphore(conn, semname, limit, timeout) finally: # 将获得的锁删除 release_lock(conn, semname, identifier)
-
计数信号量总结
- 如果对系统的时钟差异可以接受,可以使用第一种
- 不能接受系统的时钟差异,可以使用公平信号量
- 希望信号量一直运行正确,使用第三种,加锁获取信号量(推荐)
任务队列
import redis
conn = redis.Redis(host='127.0.0.1', port=6379)
# 已售出商品邮件队列
def send_sold_email_via_queue(conn, seller, item, price, buyer):
data = {
'seller_id': seller,
'item_id': item,
'price': price,
'buyer_id': buyer,
'time': time.time()
}
conn.rpush('queue:email', json.dumps(data))
# 读取列表中的发邮件任务,发送邮件
def process_sold_email_queue(conn):
while not QUIT:
packed = conn.blpop(['queue:email'], 30)
if not packed:
continue
to_send = json.loads(packed[1])
try:
fetch_data_and_send_sold_email(to_send)
except EmailSendError as err:
log_error("Failed to send sold email", err, to_send)
else:
log_success("Sent sold email", to_send)
# 可以执行多种任务的队列
def worker_watch_queue(conn, queue, callbacks):
while not QUIT:
packed = conn.blpop([queue], 30)
if not packed:
continue
name, args = json.loads(packed[1])
# 如果任务没有在callbacks中注册
if name not in callbacks:
log_error("Unknown callback %s"%name)
continue
# 调用在callbacks中已注册的函数
callbacks[name](*args)
# 任务优先级
# 实现原理:blpop可传入多个list,排在前面的list优先处理
def worker_watch_queues(conn, queues, callbacks):
while not QUIT:
packed = conn.blpop(queues, 30) # queues是由多个list组成的list
if not packed:
continue
name, args = json.loads(packed[1])
if name not in callbacks:
log_error("Unknown callback %s"%name)
continue
callbacks[name](*args)
# 延迟任务
# 实现思路:把所有要延迟的任务加到有序集合,执行时间作为score
# 另外的一个进程用来查找有序集合里面是否有要立即执行的任务
# 如果有的话,就从有序集合移除,添加到队列中
def execute_later(conn, queue, name, args, delay=0):
identifier = str(uuid.uuid4())
item = json.dumps([identifier, queue, name, args])
if delay > 0: # 添加到有序集合 延迟任务执行
conn.zadd('delayed:', item, time.time() + delay)
else: # 立即执行的任务
conn.rpush('queue:' + queue, item)
return identifier
# 执行延迟任务
def poll_queue(conn):
while not QUIT:
# 取出第一个元素
item = conn.zrange('delayed:', 0, 0, withscores=True)
# 如果没有元素 休眠0.01s
if not item or item[0][1] > time.time():
time.sleep(.01)
continue
# 取出要执行的任务
item = item[0][0]
identifier, queue, function, args = json.loads(item)
locked = acquire_lock(conn, identifier)
if not locked:
continue
# 从有序集合移除该任务 并 添加到任务队列
if conn.zrem('delayed:', item):
conn.rpush('queue:' + queue, item)
release_lock(conn, identifier, locked)
本作品采用《CC 协议》,转载必须注明作者和本文链接