Celery 一项任务启动许多小任务的最佳实践

我对芹菜有新手的经验。我写过很多任务,既有预定的也有延迟的,但仅此而已。


我遇到了一个问题,我想创建一个任务来启动 1000 个较小的作业,以减轻队列长度和可能需要数小时才能完成的作业可能出现的任何问题。


当前应用程序依赖于来自外部 API 的信息。可以这么说,用户将他们的帐户与我集成的另一项服务相关联,我想每天根据他们的外部帐户的变化来更新用户的信息。


我有这样的预定工作


@app.task() 

def refresh_accounts():

    for account in Account.objects.all():

        response = retrieve_account_info(account_id=account.id)

        account.data = response.data 

        account.save() 

--


我想要的是这样的


@app.task()

def kickoff_refresh():

    for account in Account.objects.all()

        refresh_account.delay(account_id=account.id)


@app.task() 

def refresh_account(account_id=None):

    account = Account.objects.get(id=account_id)

    response = retrieve_account_info(account_id=account.id)

    account.data = response.data 

    account.save()

我想到的一种方法是kickoff_refresh在refresh_account不同的队列中。@app.task(queue=q1), @app.task(queue=q2)... 但是,我不知道是否有更好的方法。在同一队列的任务中调用任务在 celery 中似乎是不好的做法 - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks任务kickoff_refresh将是每隔几个小时运行一次周期性任务。


我很想听听什么对其他人有用。谢谢


收到一只叮咚
浏览 112回答 1
1回答

幕布斯7119047

from celery import group@app.task()def kickoff_refresh(account_id=None):    job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()@app.task()def refresh_account(account_id=None):    account = Account.objects.get(id=account_id)    response = retrieve_account_info(account_id=account.id)    account.data = response.data     account.save()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python