最近工作中有这么一种需求,需要定时将三种任务(假设任务为:A、B、C)分配到10台Windows Server中执行,而且这三种任务中还分有优先级的(为了简单就以每种任务分三种优先级为例吧)。很容易想到这不就是做一个异步调度嘛,找一个有优先级的消息队列就应该可以搞定了。可以后来发现目前Python这边的消息队列竟然主流不支持Windows,如:RQ、高版本的Celery,还有优先级支持也不是很好,于是乎打算自己造一个。
看了一些相关的博客,发现可以用Redis的list结构做队列,对于优先级的支持呢目前我是打算采用这种方式:
每一种任务每一种优先级都单独放一个队列存储(那么三种任务并且每种任务三个优先级别的话就需要9个Redis队列)。
上代码前先简单说明一下实现流程,其实主要就两个模块:入队、出队,说清楚这两块就OK了。
入队时,定时任务将A、B、C任务以及它们的优先级别传过来,接着我们对其进行判断,看各些任务进那些队列中(也就是各些任务在Redis队列中的键是什么)。我目前采用这么一种键的组合方式:任务类型-优先级(taskType-level),比如:A类型任务中优先级为1的任务最后进入的Redis队列的键为:A-1,那么优先级为100的B类型任务在Redis队列中的键也就为:B-100。简单弄了一张图,凑合着看吧。
image.png
到出队了,出队这边其实挺简单,第一种是:如果该Redis的DB下只有我们的任务,那么我们把所有的键取出来即可,取出来后可以对键按优先级排列(像SQL:order by level),或按任务类型和优先级排列(像SQL:order by taskType, level),排列后得到一个键的列表,再根据这个键的列表去pop任务即可。第二种是:我们可以配置某台客户端可执行的任务的类型,比如其中一台电脑我只想让它跑A类型任务。那我只给它配置A,这样让它去模式匹配Redis中的键(A-[0-9]*),这样取出来的就是A类型的所有优先级的任务了,如果想让它跑A、B任务就可以循环匹配嘛。
我也不知道有没有讲清楚这个流程,看代码吧(代码写得丑,萌新请各位大大多指教)
https://github.com/wikizero/MyScripts/blob/master/forWork/MyRedisQueue.py
# coding:utf-8import redisimport reimport jsonimport timefrom itertools import chainfrom datetime import datetime, dateclass ExpandJsonEncoder(json.JSONEncoder): ''' 采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展 ''' def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj)class MyRedisQueue: def __init__(self): self.redis_connect = redis.Redis() def get_len(self, key): keys = self.get_keys(key) # 每个键的任务数量 key_len = [(k, self.redis_connect.llen(k)) for k in keys] # 所有键的任务数量 task_len = sum(dict(key_len).values()) return task_len, key_len def get_keys(self, key): # Redis的键支持模式匹配 keys = self.redis_connect.keys(key + '-[0-9]*') # 按优先级将键降序排序 keys = sorted(keys, key=lambda x: int(x.split('-')[-1]), reverse=True) return keys def push_task(self, key, tasks, level=1): ''' 双端队列,左边推进任务 :param level: 优先级(int类型),数值越大优先级越高,默认1 :return: 任务队列任务数量 ''' # 重新定义优先队列的key new_key = key + '-' + str(level) # 序列化任务参数 tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks] print 'RedisQueue info > the number of push tasks:', len(tasks) if not tasks: return self.get_len(key) self.redis_connect.lpush(new_key, *tasks) return self.get_len(key) def pop_task(self, keys=None, priority=False): ''' 双端队列 右边弹出任务 :param keys: 键列表,默认为None(将获取所有任务的keys) :return: ''' while True: # 避免在while循环中修改参数,将keys参数赋值到临时变量 temp_keys = keys # 不指定keys,将获取所有任务 if not keys: temp_keys = self.redis_connect.keys() temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)])) # 根据key作为关键字获取所有的键 all_keys = list(chain(*[self.get_keys(k) for k in temp_keys])) # 屏蔽任务差异性,只按优先级高到低弹出任务 if priority: all_keys = sorted(all_keys, key=lambda x: int(x.split('-')[-1]), reverse=True) if all_keys: task_key, task = self.redis_connect.brpop(all_keys) return task_key, json.loads(task) time.sleep(2)if __name__ == '__main__': mrq = MyRedisQueue() # 把任务推入redis 队列 # lst = [i for i in xrange(0, 40)] # print mrq.push_task('C', lst, level=4) # 从redis queue取出任务 # while True: # task_type, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True) # print task_type, task # time.sleep(1) # 查看任务数量以及优先级情况 # count, key_len = mrq.get_len('task') # print key_len
作者:wikizero
链接:https://www.jianshu.com/p/cf6cd407b276