使用原始 PyMySQL 进行多线程处理 celery

在我目前正在进行的项目中,我不允许使用 ORM,所以我自己做了

它工作得很好,但我在 Celery 和它的并发性方面遇到了问题。有一段时间,我将其设置为1(using --concurrency=1),但我添加了新任务,这些任务的处理时间比使用 celerybeat 运行所需的时间要长,这会导致任务大量积压。

当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin 因为它很大):

https://pastebin.com/M4HZXTDC

关于如何在其他进程上实现某种锁定/等待以使不同的工作人员不会相互交叉的任何想法?

编辑:这是我设置PyMySQL 实例以及如何处理打开和关闭的位置


子衿沉夜
浏览 98回答 1
1回答

鸿蒙传说

PyMSQL不允许线程共享同一个连接(模块可以共享,但线程不能共享连接)。您的模型类在各处重用相同的连接。因此,当不同的工作人员调用模型进行查询时,他们使用相同的连接对象,从而导致冲突。确保您的连接对象是线程本地的。不要使用db类属性,而是考虑一种检索线程本地连接对象的方法,而不是重用可能在不同线程中创建的连接对象。例如,在任务中创建连接。现在,您在每个模型的任何地方都使用全局连接。# Connect to the databaseconnection = pymysql.connect(**database_config)class Model(object):    """    Base Model class, all other Models will inherit from this    """    db = connection为了避免这种情况,您可以在方法中创建数据库__init__......class Model(object):    """    Base Model class, all other Models will inherit from this    """    def __init__(self, *args, **kwargs):        self.db = pymysql.connect(**database_config)但是,这可能不高效/不实用,因为 db 对象的每个实例都会创建一个会话。为了改进这一点,您可以使用一种方法threading.local来将连接保持在线程本地。class Model(object):    """    Base Model class, all other Models will inherit from this    """    _conn = threading.local()    @property    def db(self):        if not hasattr(self._conn, 'db'):            self._conn.db = pymysql.connect(**database_config)        return self._conn.db请注意,假设您使用线程并发模型,线程本地解决方案就可以工作。另请注意,celery 默认情况下使用多个进程(prefork)。这可能是问题,也可能不是问题。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python