将python-trio中的信号量和时间限制与asks HTTP请求相结合

我正在尝试以异步方式使用Python,以便加快对服务器的请求。服务器的响应时间很慢(通常为几秒钟,但有时也快于一秒钟),但并行运行良好。我无权访问此服务器,也无法更改任何内容。因此,我有一个很大的URL列表(在下面的代码中pages),这些列表是我事先知道的,并且希望通过一次发出NO_TASKS=5请求来加快它们的加载速度。另一方面,我不想使服务器超载,因此我希望每个请求之间的最小间隔为1秒(即每秒1个请求的限制)。


到目前为止,我已经使用Trio队列成功实现了信号量部分(一次五个请求)。


import asks

import time

import trio


NO_TASKS = 5



asks.init('trio')

asks_session = asks.Session()

queue = trio.Queue(NO_TASKS)

next_request_at = 0

results = []



pages = [

    'https://www.yahoo.com/',

    'http://www.cnn.com',

    'http://www.python.org',

    'http://www.jython.org',

    'http://www.pypy.org',

    'http://www.perl.org',

    'http://www.cisco.com',

    'http://www.facebook.com',

    'http://www.twitter.com',

    'http://www.macrumors.com/',

    'http://arstechnica.com/',

    'http://www.reuters.com/',

    'http://abcnews.go.com/',

    'http://www.cnbc.com/',

]



async def async_load_page(url):

    global next_request_at

    sleep = next_request_at

    next_request_at = max(trio.current_time() + 1, next_request_at)

    await trio.sleep_until(sleep)

    next_request_at = max(trio.current_time() + 1, next_request_at)

    print('start loading page {} at {} seconds'.format(url, trio.current_time()))

    req = await asks_session.get(url)

    results.append(req.text)



async def producer(url):

    await queue.put(url)  



async def consumer():

    while True:

        if queue.empty():

            print('queue empty')

            return

        url = await queue.get()

        await async_load_page(url)



async def main():

    async with trio.open_nursery() as nursery:

        for page in pages:

            nursery.start_soon(producer, page)

        await trio.sleep(0.2)

        for _ in range(NO_TASKS):

            nursery.start_soon(consumer)



start = time.time()

trio.run(main)

但是,我缺少限制部分的实现,即max。的实现。每秒1个请求。您可以在我尝试这样做的上方看到(的前五行async_load_page)


繁花如伊
浏览 211回答 3
3回答

九州编程

trio.current_time()为此使用恕我直言太复杂了。进行速率限制的最简单方法是速率限制器,即基本上可以执行此操作的单独任务:async def ratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):    with trio.open_cancel_scope() as scope:        task_status.started(scope)        while True:            await queue.get()            await trio.sleep(tick)使用示例:async with trio.open_nursery() as nursery:    q = trio.Queue(0)    limiter = await nursery.start(ratelimit, q, 1)    while whatever:        await q.put(None) # will return at most once per second        do_whatever()    limiter.cancel()换句话说,您可以使用以下命令启动该任务q = trio.Queue(0)limiter = await nursery.start(ratelimit, q, 1)然后您可以确定最多await q.put(None)零长度队列充当集合点,因此每秒将返回。完成后,致电 limiter.cancel()停止限速任务,否则您的托儿所将不会退出。如果您的用例包括开始的子任务,您需要在取消限制器之前完成这些子任务,那么最简单的方法是将它们冲洗到另一个托儿所中,而不是while whatever:    await q.put(None) # will return at most once per second    do_whatever()limiter.cancel()你会用类似的东西async with trio.open_nursery() as inner_nursery:    await start_tasks(inner_nursery, q)limiter.cancel()它将在触摸限制器之前等待任务完成。注意:您可以轻松地将其调整为“突发”模式,即,只需增加队列的长度,就可以在速率限制生效之前允许一定数量的请求。

胡说叔叔

此解决方案的动机和由来自从我问了这个问题以来已经过去了几个月。从那时起,Python得到了改进,三人组(以及我对它们的了解)也有所改进。因此,我认为是时候使用带有类型注释和trio-0.10内存通道的Python 3.6进行一些更新了。我对原始版本进行了自己的改进,但是在阅读@Roman Novatorov的出色解决方案后,再次进行了调整,这就是结果。对于函数的主要结构(以及使用httpbin.org进行说明的想法)表示敬意。我选择使用内存通道而不是互斥锁,以便能够从工作程序中删除所有令牌重新释放逻辑。解决方案说明我可以这样改写原来的问题:我希望有许多工作人员彼此独立地启动请求(因此,它们将被实现为异步功能)。在任何时候都释放零或一个令牌;向服务器发起请求的任何工作人员都将消耗一个令牌,并且直到经过最短时间后才会发出下一个令牌。在我的解决方案中,我使用三重奏的内存通道来协调令牌发行者和令牌使用者(工人)之间的关系如果您不熟悉内存通道及其语法,可以在trio doc中阅读有关它们的信息。我想的逻辑async with memory_channel,并memory_channel.clone()能在第一时刻被混淆。from typing import List, Iteratorimport asksimport trioasks.init('trio')links: List[str] = [    'https://httpbin.org/delay/7',    'https://httpbin.org/delay/6',    'https://httpbin.org/delay/4'] * 3async def fetch_urls(urls: List[str], number_workers: int, throttle_rate: float):    async def token_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):        async with token_sender:            for _ in range(number_tokens):                await token_sender.send(None)                await trio.sleep(1 / throttle_rate)    async def worker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):        async with token_receiver:            for url in url_iterator:                await token_receiver.receive()                print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')                response = await asks.get(url)                # print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')                responses.append(response)    responses = []    url_iterator = iter(urls)    token_send_channel, token_receive_channel = trio.open_memory_channel(0)    async with trio.open_nursery() as nursery:        async with token_receive_channel:            nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))            for _ in range(number_workers):                nursery.start_soon(worker, url_iterator, token_receive_channel.clone())    return responsesresponses = trio.run(fetch_urls, links, 5, 1.)日志输出示例:如您所见,所有页面请求之间的最短时间为一秒:[177878.99] Start loading link: https://httpbin.org/delay/7[177879.99] Start loading link: https://httpbin.org/delay/6[177880.99] Start loading link: https://httpbin.org/delay/4[177881.99] Start loading link: https://httpbin.org/delay/7[177882.99] Start loading link: https://httpbin.org/delay/6[177886.20] Start loading link: https://httpbin.org/delay/4[177887.20] Start loading link: https://httpbin.org/delay/7[177888.20] Start loading link: https://httpbin.org/delay/6[177889.44] Start loading link: https://httpbin.org/delay/4解决方案评论由于对于异步代码而言并非不常见,因此该解决方案不会保留请求的URL的原始顺序。解决此问题的一种方法是将id与原始url相关联,例如使用元组结构,将响应放入响应字典中,然后依次抓取响应以将其放入响应列表中(保存排序并具有线性复杂)。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python