在多线程环境下使用DataFrame.to_sql时MySQL死锁

我在 docker 容器内有一个多线程 ETL 进程,看起来像这样的简化代码:


class Query(abc.ABC):

    def __init__(self):

        self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)


    def load(self, df: pd.DataFrame) -> None:

        df.to_sql(

            name=self.table, con=self.connection, if_exists="replace", index=False,

        )


    @abc.abstractmethod

    def transform(self, data: object) -> pd.DataFrame:

        pass


    @abc.abstractmethod

    def extract(self) -> object:

        pass


    #  other methods...



class ComplianceInfluxQuery(Query):

    # Implements abstract methods... load method is the same as Query class



ALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]



while True:

    with ThreadPoolExecutor(max_workers=8) as pool:

        for query in ALL_QUERIES:

            pool.submit(execute_etl, query) # execute_etl function calls extract, transform and load


load()许多类继承自 Query,具有与类中所示相同的实现,Query它只是将 pandas DataFrame 对象加载到 sql 表中,并替换该表(如果存在)。


所有类同时运行,并在完成后将结果加载到 MySQLExtract()数据库Transform()。每个类都会将不同的表加载到数据库中。


load()当调用该方法时,我经常会从随机线程中遇到死锁:

日志显示了load()两个线程几乎同时调用的方法。无论数据如何,这种情况都可能发生在所有类中。

我运行了命令SHOW ENGINE INNODB STATUS,那里没有列出死锁。

cc_influx_share_count我检查了 General_log 表以更好地了解死锁期间发生的情况,但除了死锁的线程在(我认为)应该具有的情况下没有向表中插入任何值这一事实之外,没有注意到任何有用的信息:

  • 该错误于 09:48:28,241 提出

SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;

此 ETL 是运行 MySQL 的唯一进程。我已阅读有关死锁发生原因的文档,但我无法理解两个之间没有连接的不同表如何导致死锁。我知道我可以简单地load()再次运行该方法直到成功,但我想了解为什么会发生死锁以及如何防止它们。

MySQL版本是8.0.21。蟒蛇3.8.4。sqlalchemy 1.3.19。熊猫 1.0.5。PyMySQL 0.10.1。


烙印99
浏览 160回答 2
2回答

拉丁的传说

如果多个连接尝试同时 INSERT 或 UPDATE 到同一个表,则可能会因表索引的争用而导致死锁。你的问题说你从多个线程执行 INSERT。执行 INSERT 需要检查主键唯一性和外键有效性等约束,然后更新表示这些约束的索引。所以多个并发更新必须锁定索引以供读取,然后锁定它们以进行写入。从你的问题来看,MySQL有时会陷入死锁情况(一个线程按a,b顺序锁定索引,另一个线程按b,a顺序锁定索引)。如果不同的线程可以同时将行插入到不同的表中,并且这些表通过外键约束相互关联,则索引维护相对容易陷入死锁情况。您可以通过在执行加载之前更改要填充的表以删除所有索引(自动增量主键除外),然后在之后重新创建它们来解决此问题。或者,您可以摆脱并发性并仅使用一个线程执行ETL的L部分。由于所有索引维护,线程并不能像直观上那样帮助提高吞吐量。避免在多个并发线程上运行数据定义语言(CREATE TABLE、CREATE INDEX 等)。对这些东西进行故障排除比其价值更麻烦。此外,在事务中包装大约数百行的每个块的 INSERT 可以以惊人的方式帮助 ETL 吞吐量。在每个主干之前,说BEGIN TRANSACTION;&nbsp;在每个块之后说COMMIT;&nbsp;为什么这有帮助?因为 COMMIT 操作需要时间,并且不在显式事务中的每个操作后面都会有一个隐式 COMMIT。

鸿蒙传说

我发现这个问题的一个可能的解决方案是重试机制。如果发生死锁 - 睡眠并多尝试几次直到成功,同时将 DF 保留在内存中:class Query(abc.ABC):&nbsp; &nbsp; def __init__(self):&nbsp; &nbsp; &nbsp; &nbsp; self.engine = MysqlEngine.engine()&nbsp; &nbsp; ....&nbsp; &nbsp; ....&nbsp; &nbsp; def load(self, df: pd.DataFrame) -> None:&nbsp; &nbsp; &nbsp; &nbsp; for i in range(5):&nbsp; # If load fails due to a deadlock, try 4 more times&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; df.to_sql(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name=self.table,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; con=self.engine.connect(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if_exists="replace",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; index=False,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except sqlalchemy.exc.OperationalError as ex:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if "1213" in repr(ex):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.warning(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "Failed to acquire lock for %s", self.__class__.__name__&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sleep(1)死锁仍然会发生,并且您会损失一些性能,但它胜过重新执行整个 Extrac - Transform。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python