Python - 如何 - Big Query 异步任务

这可能是一个愚蠢的问题,但我似乎无法异步运行 python google-clood-bigquery。


我的目标是同时运行多个查询并等待所有asyncio.wait()查询在查询收集器中完成。我正在使用asyncio.create_tast()来启动查询。问题是每个查询在开始之前都等待前一个查询完成。


这是我的查询功能(很简单):


async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:

  job = self.api.query(query, **kwargs)

  return job.result()

既然我不能等待job.result(),我应该等待别的东西吗?


慕沐林林
浏览 168回答 3
3回答

慕标琳琳

如果您在 a 内部工作coroutine并希望在不阻塞的情况下运行不同的查询,event_loop那么您可以使用run_in_executor基本上在后台线程中运行查询而不阻塞循环的函数。这是如何使用它的一个很好的例子。确保这正是您所需要的;为在 Python API 中运行查询而创建的作业已经是异步的,它们仅在您调用job.result(). 这意味着asyncio除非您在协程内,否则您不需要使用。这是在作业完成后立即检索结果的快速示例:from concurrent.futures import ThreadPoolExecutor, as_completedimport google.cloud.bigquery as bqclient = bq.Client.from_service_account_json('path/to/key.json')query1 = 'SELECT 1'query2 = 'SELECT 2'threads = []results = []executor = ThreadPoolExecutor(5)for job in [client.query(query1), client.query(query2)]:    threads.append(executor.submit(job.result))# Here you can run any code you like. The interpreter is freefor future in as_completed(threads):    results.append(list(future.result()))results 将:[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]

侃侃尔雅

事实上,由于该asyncio.create_task()函数,我找到了一种方法可以很容易地将我的查询包装在一个 asyinc 调用中。我只需要将它包装job.result()在一个协程中;这是实现。它现在异步运行。class BQApi(object):                                                                                                     def __init__(self):                                                                                                      self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                                   async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                               job = self.api.query(query, **kwargs)                                                                                task = asyncio.create_task(self.coroutine_job(job))                                                                  return await task                                                                                                @staticmethod                                                                                                        async def coroutine_job(job):                                                                                            return job.result()   
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python