Redis In Action 笔记(六):使用 Redis 作为应用程序组件

自动补全

程序说明

实现给一个组别中的成员发邮件时,输入前缀能找出匹配的收件人;程序假设所有名字是由字母组成。

设计思路

比如,查找abc前缀的名称:

  • → 查找范围是(abbz, abd)
  • → zrange取出该范围
  • → 但问题是这两个元素的可能不存在
  • → 向有序集合插入两个特殊元素,一个在abbz之后,一个在abd之前
  • → 根据这两个特殊元素取排名→根据排名取范围
  • → 已知:ASCII编码里面,z后面的第一个字符是 '{', a前面的第一个是反引号 ' ` '
  • →得到 abd 之前的且所有以 abc 为前缀的合法名字之后的元素「 abc{ 」 →作为查找的结束元素
  • →得到 abbz 之后的第一个元素是 abb{,这个元素位置 abc 之前→用作起始元素
  • →特殊情况:如果是查找的前缀是aba,则起始元素是ab`
  • → 由此得到查找的范围,即开始元素是前缀的最后一个字符后退一个ASCII字符 + ' { '(如果是a则变为' ` '), 结束元素是前缀直接加' { '

代码实现

import uuid
import bisect
import redis

conn = redis.Redis(host='127.0.0.1', port=6379)

valid_characters = '`abcdefghijklmnopqrstuvwxyz{'             

# 获得起始和结束范围
def find_prefix_range(prefix):
    posn = bisect.bisect_left(valid_characters, prefix[-1:])  
    suffix = valid_characters[(posn or 1) - 1]                
    return prefix[:-1] + suffix + '{', prefix + '{'     

# 自动补全程序
def autocomplete_on_prefix(conn, guild, prefix):
    # 获取起始和结束字符
    start, end = find_prefix_range(prefix)                 
    identifier = str(uuid.uuid4()) 
    # 将开始和结束加上一个唯一标识符,避免其他用户同时操作时相互干扰                        
    start += identifier                                    
    end += identifier
    # 要查找的组                                      
    zset_name = 'members:' + guild

    # 向有序集合中插入起始和结束字符
    conn.zadd(zset_name, {start:0, end:0})                 
    pipeline = conn.pipeline(True)
    while 1:
        try:
            pipeline.watch(zset_name) 
            # 获得起始和结束元素的位置
            sindex = pipeline.zrank(zset_name, start)      
            eindex = pipeline.zrank(zset_name, end) 

            # 这里用于让取出的元素不超过10个       
            erange = min(sindex + 9, eindex - 2)           
            pipeline.multi()

            # 移除插入的两个元素
            pipeline.zrem(zset_name, start, end) 
            # 获得搜索结果          
            pipeline.zrange(zset_name, sindex, erange)     
            items = pipeline.execute()[-1]                 
            break
        except redis.exceptions.WatchError:                
            continue                                       
    # 移除可能由其他客户端插入的用于搜索的元素(含有{)
    return [item.decode('utf-8') for item in items if b'{' not in item]   

# 加入小组
def join_guild(conn, guild, user):
    conn.zadd('members:' + guild, {user:0})

# 退出小组
def leave_guild(conn, guild, user):
    conn.zrem('members:' + guild, user)   

# 测试运行
join_guild(conn, 'aa', 'abc')    
join_guild(conn, 'aa', 'abcd')    
join_guild(conn, 'aa', 'abcz')    
join_guild(conn, 'aa', 'abcx')    
join_guild(conn, 'aa', 'abcy')    
join_guild(conn, 'aa', 'abci')    
join_guild(conn, 'aa', 'abcj')    
join_guild(conn, 'aa', 'abd')  
items = autocomplete_on_prefix(conn, 'aa', 'abc')  
print(items)
# 输出: ['abc', 'abcd', 'abci', 'abcj', 'abcx', 'abcy', 'abcz']

有序集合常规用途

  • 快速判断某个元素是否存在于集合里面(ZSCORE key member,不存在这返回nil)
  • 查看某个成员在有序集合中的位置或索引(ZRANK key-name member)
  • 取出某个范围的元素(ZRANGE key-name start offset [WITHSCORES])

这里的用法

所有score设为0,当所有成员分值都相同时,按照成员的名字来进行排序; 而当所有成员分值都是0的时候,成员按照字符串的二进制进行排序。

分布式锁

实现一个分布式锁,核心的命令是:setnx(SET if Not eXists),它的作用是当设置一个KEY时,如果该KEY不存在,才能设置成功。那么当一个客户端设置一个KEY成功,并将其值设置为一个唯一标识符,其他客户端使用同样的命令则会设置失败,也就是说,其他客户端在该KEY存在的周期内,是无法成功设置该KEY的值的(程序上返回false),这相当于设置该KEY的值成功的客户端获得了锁,当该客户端使用完毕并将其删除,其他客户端才能够设置该KEY。

当系统负载较大的时候,WATCH锁导致程序进行重试的次数剧增,分布式锁无此问题。

程序实现

 # 获取锁
def acquire_lock(conn, lockname, acquire_timeout=10):
    identifier = str(uuid.uuid4())                      
    end = time.time() + acquire_timeout
    # 在超时时间内不停重试,直到设置KEY的值成功,也即获得了「锁」
    while time.time() < end:
        if conn.setnx('lock:' + lockname, identifier):
            # 加锁成功返回标识符  
            return identifier
        time.sleep(.001)
    return False

# 4.4.3节中的函数改写
def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
    buyer = "users:%s" % buyerid
    seller = "users:%s" % sellerid
    item = "%s.%s" % (itemid, sellerid)
    inventory = "inventory:%s" % buyerid

    # 加锁(这里对整个市场加锁)
    # 也可以改变锁的粒度,只对操作到的商品加锁
    locked = acquire_lock(conn, 'market:')     
    if not locked:
        return False

    pipe = conn.pipeline(True)
    try:
        pipe.zscore("market:", item)           
        pipe.hget(buyer, 'funds')              
        price, funds = pipe.execute()          
        if price is None or price > funds:     
            return None                        

        pipe.hincrby(seller, 'funds', int(price))  
        pipe.hincrby(buyer, 'funds', int(-price))  
        pipe.sadd(inventory, itemid)               
        pipe.zrem("market:", item)                 
        pipe.execute()                             
        return True
    finally:
        # 释放锁
        release_lock(conn, 'market:', locked) 

# 释放锁
def release_lock(conn, lockname, identifier):
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname

    while True:
        try:
            pipe.watch(lockname)                
            if pipe.get(lockname) == identifier:
                pipe.multi()                    
                pipe.delete(lockname)           
                pipe.execute()                  
                return True                     

            pipe.unwatch()
            break

        except redis.exceptions.WatchError:     
            pass                                

    return False

带超时的锁

可以取得锁之后,给锁加一个超时时间,可以避免因为不可预料的情况,持有者一直持有锁。

acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
    identifier = str(uuid.uuid4())                      
    lockname = 'lock:' + lockname
    lock_timeout = int(math.ceil(lock_timeout))         

    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx(lockname, identifier): 
            # 给锁设置超时时间           
            conn.expire(lockname, lock_timeout)         
            return identifier
        # 如果锁已存在,顺便设置下超时时间
        elif conn.ttl(lockname) < 0:                    
            conn.expire(lockname, lock_timeout)         

        time.sleep(.001)

    return False 

计数信号量

  • 计数信号量是一种锁,用于限制访问同一资源的进程数

  • 可以把上一节创建的锁看成只能被一个进程访问的计数信号量

  • 同样有获取锁、释放锁的操作,但获取锁失败时,倾向于立即返回失败结果,而不是重试

  • 实现超时限制特性:1.使用EXPIRE    2.使用有序集合

    例子

    存储信号量信息的有序集合:
    Redis In Action 笔记(六):使用 Redis 作为应用程序组件

  • 占坑

    def acquire_semaphore(conn, semname, limit, timeout=10):
    identifier = str(uuid.uuid4())                             
    now = time.time()
    
    pipeline = conn.pipeline(True)
    # 清理超时的坑位
    pipeline.zremrangebyscore(semname, '-inf', now - timeout)  
    pipeline.zadd(semname, {identifier::now}) # 占个坑先                   
    pipeline.zrank(semname, identifier)  # 获取位置                      
    if pipeline.execute()[-1] < limit:   # 如果坑位还足够,就抢占成功                     
        return identifier 
    
    # 坑位不足,占不到坑,就只能灰溜溜地走了
    conn.zrem(semname, identifier)                             
    return None  
  • 释放坑位

    def release_semaphore(conn, semname, identifier):
    return conn.zrem(semname, identifier)  

    公平信号量

    解决系统时间可能不一致,导致系统时间较慢的客户端能抢先占有信号量的问题。
    实现思路:新增一个计数器,再新增一个存储计数信号量信息的有序集合(A),这个有序集合以最新的计数器的值为score,还需要另一个存储计数信号量的有序集(B),score值为时间戳,用来清理超时的信号量。B清理调超时信号量后,与A求交集(zinterstore),B的聚合权重设为0,最终结果保存到A,这样一来,A通过B间接地清理掉超时信号量。实现代码如下:

  • 取得计数信号量

    def acquire_fair_semaphore(conn, semname, limit, timeout=10):
    identifier = str(uuid.uuid4())                             
    czset = semname + ':owner'  # 信号量集合(B)
    ctr = semname + ':counter'  # 计数器
    
    now = time.time()
    pipeline = conn.pipeline(True)
    # 删除(A)中超时的,然后与czset(B)求交集来间接删除czset中超时的
    pipeline.zremrangebyscore(semname, '-inf', now - timeout) 
    # 求交集,semname聚合权重为0,这样不影响(A)的score值 
    pipeline.zinterstore(czset, {czset: 1, semname: 0})        
    
    pipeline.incr(ctr)                                         
    counter = pipeline.execute()[-1]                           
    
    pipeline.zadd(semname, identifier, now) # 以时间戳为score的zset                   
    pipeline.zadd(czset, identifier, counter)  # 以计数为score的zset                 
    
    pipeline.zrank(czset, identifier)  # 获取位置                        
    if pipeline.execute()[-1] < limit:                         
        return identifier                                      
    
    pipeline.zrem(semname, identifier)                         
    pipeline.zrem(czset, identifier)                           
    pipeline.execute()
    return None 
  • 释放锁

    比非公平信号量多了一个集合要删除。

    def release_fair_semaphore(conn, semname, identifier):
    pipeline = conn.pipeline(True)
    pipeline.zrem(semname, identifier)
    pipeline.zrem(semname + ':owner', identifier)
    return pipeline.execute()[0] 
  • 刷新信号量

    前面设置的计数信号量10s就超时,需要进行刷新,防止过期

    # 刷新信号量            
    def refresh_fair_semaphore(conn, semname, identifier):
    # zadd操作,元素不存在时执行添加操作,返回1;存在时则更新元素,返回0
    if conn.zadd(semname, identifier, time.time()): 
        # 如果是添加,将判断语句添加的删除,返回False,表示已过期           
        release_fair_semaphore(conn, semname, identifier)      
        return False 
    
    # 如果是更新,直接跳到这里,返回True,表示刷新成功                                         
    return True 
  • 获取信号量之加锁版

    用于消除竞争条件

    def acquire_semaphore_with_lock(conn, semname, limit, timeout=10):
    # 设置一个超短时间过期的锁
    identifier = acquire_lock(conn, semname, acquire_timeout=.01)
    # 如果设置成功
    if identifier:
        try:
            # 获取公平信号量
            return acquire_fair_semaphore(conn, semname, limit, timeout)
        finally:
            # 将获得的锁删除
            release_lock(conn, semname, identifier)
  • 计数信号量总结

    • 如果对系统的时钟差异可以接受,可以使用第一种
    • 不能接受系统的时钟差异,可以使用公平信号量
    • 希望信号量一直运行正确,使用第三种,加锁获取信号量(推荐)

任务队列

import redis

conn = redis.Redis(host='127.0.0.1', port=6379)

# 已售出商品邮件队列
def send_sold_email_via_queue(conn, seller, item, price, buyer):
    data = {
        'seller_id': seller,                    
        'item_id': item,                        
        'price': price,                         
        'buyer_id': buyer,                      
        'time': time.time()                     
    }
    conn.rpush('queue:email', json.dumps(data)) 

# 读取列表中的发邮件任务,发送邮件
def process_sold_email_queue(conn):
    while not QUIT:
        packed = conn.blpop(['queue:email'], 30)                  
        if not packed:                                            
            continue                                              

        to_send = json.loads(packed[1])                           
        try:
            fetch_data_and_send_sold_email(to_send)               
        except EmailSendError as err:
            log_error("Failed to send sold email", err, to_send)
        else:
            log_success("Sent sold email", to_send)

# 可以执行多种任务的队列
def worker_watch_queue(conn, queue, callbacks):
    while not QUIT:
        packed = conn.blpop([queue], 30)                    
        if not packed:                                      
            continue                                        

        name, args = json.loads(packed[1]) 
        # 如果任务没有在callbacks中注册                 
        if name not in callbacks:                           
            log_error("Unknown callback %s"%name)           
            continue 
        # 调用在callbacks中已注册的函数                                       
        callbacks[name](*args)   

# 任务优先级 
# 实现原理:blpop可传入多个list,排在前面的list优先处理
def worker_watch_queues(conn, queues, callbacks):   
    while not QUIT:
        packed = conn.blpop(queues, 30)  # queues是由多个list组成的list           
        if not packed:
            continue

        name, args = json.loads(packed[1])
        if name not in callbacks:
            log_error("Unknown callback %s"%name)
            continue
        callbacks[name](*args)   

# 延迟任务
# 实现思路:把所有要延迟的任务加到有序集合,执行时间作为score
# 另外的一个进程用来查找有序集合里面是否有要立即执行的任务
# 如果有的话,就从有序集合移除,添加到队列中
def execute_later(conn, queue, name, args, delay=0):
    identifier = str(uuid.uuid4())                          
    item = json.dumps([identifier, queue, name, args])      
    if delay > 0: # 添加到有序集合 延迟任务执行
        conn.zadd('delayed:', item, time.time() + delay)    
    else: # 立即执行的任务
        conn.rpush('queue:' + queue, item)                  
    return identifier    

# 执行延迟任务
def poll_queue(conn):
    while not QUIT:
        # 取出第一个元素
        item = conn.zrange('delayed:', 0, 0, withscores=True) 
        # 如果没有元素 休眠0.01s  
        if not item or item[0][1] > time.time():                
            time.sleep(.01)                                     
            continue                                            

        # 取出要执行的任务
        item = item[0][0]                                       
        identifier, queue, function, args = json.loads(item)    

        locked = acquire_lock(conn, identifier)                 
        if not locked:                                          
            continue                                            
        # 从有序集合移除该任务 并 添加到任务队列
        if conn.zrem('delayed:', item):                         
            conn.rpush('queue:' + queue, item)                  

        release_lock(conn, identifier, locked)                                                                              

参考资料:https://redislabs.com/ebook/part-2-core-co...

本作品采用《CC 协议》,转载必须注明作者和本文链接
Was mich nicht umbringt, macht mich stärker
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!