在芹菜中对AsyncResult进行单元测试

我正在尝试在Django的单元测试框架中测试一些celery功能,但是每当我尝试检查AsyncResult时,测试的行为就好像从未启动过一样。


我知道这段代码可以在RabbitMQ的真实环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。


这是一个例子:


@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,

                   CELERY_ALWAYS_EAGER = True,

                   BROKER_BACKEND = 'memory',)

def test_celery_do_work(self):

    result = myapp.tasks.celery_do_work.AsyncResult('blat')

    applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat')

    applied_task.wait()

    # THIS SUCCEEDS

    self.assertTrue(applied_task.successful())

    # THIS FAILS

    self.assertTrue(result.successful())

使用ALWAYS_EAGER选项是否会因为立即执行而禁用AsyncResult功能?如果是这样,有什么方法可以对AsyncResult状态检查进行单元测试?如果我尝试取出ALWAYS_EAGER选项,则测试将永远不会运行,因此我一头雾水。


慕妹3242003
浏览 244回答 1
1回答

哈士奇WWW

如果CELERY_ALWAYS_EAGER为is True,则对的呼叫apply_async()实际上被替换为apply()。返回的结果是EagerResult,其中已经包含任务的结果。因此,是的,设置ALWAYS_EAGER = True将禁用整个AsyncResult功能。整个异步过程将被绕过,并且实际上没有任何任务发送到代理,这就是为什么您无法通过检索结果的原因AsyncResult。CELERY_ALWAYS_EAGER = True在测试仅需要Celery结果的代码路径时使用,并与EagerResult或一起使用相同的方式AsyncResult。如果需要,还有运行与测试的方式AsyncResult也与CELERY_ALWAYS_EAGER = False,但对于这一点,你需要调用任务在你的测试用例之前启动工作。然后,工作人员将能够执行您的任务,并且AsyncResult可以正常工作。您可以看一下django-celery-testworker,它似乎就是这样做的,尽管我尚未对其进行测试。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python