我正在尝试在Django的单元测试框架中测试一些celery功能,但是每当我尝试检查AsyncResult时,测试的行为就好像从未启动过一样。
我知道这段代码可以在RabbitMQ的真实环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。
这是一个例子:
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
CELERY_ALWAYS_EAGER = True,
BROKER_BACKEND = 'memory',)
def test_celery_do_work(self):
result = myapp.tasks.celery_do_work.AsyncResult('blat')
applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat')
applied_task.wait()
# THIS SUCCEEDS
self.assertTrue(applied_task.successful())
# THIS FAILS
self.assertTrue(result.successful())
使用ALWAYS_EAGER选项是否会因为立即执行而禁用AsyncResult功能?如果是这样,有什么方法可以对AsyncResult状态检查进行单元测试?如果我尝试取出ALWAYS_EAGER选项,则测试将永远不会运行,因此我一头雾水。
慕妹3242003
哈士奇WWW
随时随地看视频慕课网APP
相关分类