我在 docker 容器内有一个多线程 ETL 进程,看起来像这样的简化代码:
class Query(abc.ABC):
def __init__(self):
self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)
def load(self, df: pd.DataFrame) -> None:
df.to_sql(
name=self.table, con=self.connection, if_exists="replace", index=False,
)
@abc.abstractmethod
def transform(self, data: object) -> pd.DataFrame:
pass
@abc.abstractmethod
def extract(self) -> object:
pass
# other methods...
class ComplianceInfluxQuery(Query):
# Implements abstract methods... load method is the same as Query class
ALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]
while True:
with ThreadPoolExecutor(max_workers=8) as pool:
for query in ALL_QUERIES:
pool.submit(execute_etl, query) # execute_etl function calls extract, transform and load
load()许多类继承自 Query,具有与类中所示相同的实现,Query它只是将 pandas DataFrame 对象加载到 sql 表中,并替换该表(如果存在)。
所有类同时运行,并在完成后将结果加载到 MySQLExtract()数据库Transform()。每个类都会将不同的表加载到数据库中。
load()当调用该方法时,我经常会从随机线程中遇到死锁:
日志显示了load()
两个线程几乎同时调用的方法。无论数据如何,这种情况都可能发生在所有类中。
我运行了命令SHOW ENGINE INNODB STATUS
,那里没有列出死锁。
cc_influx_share_count
我检查了 General_log 表以更好地了解死锁期间发生的情况,但除了死锁的线程在(我认为)应该具有的情况下没有向表中插入任何值这一事实之外,没有注意到任何有用的信息:
该错误于 09:48:28,241 提出
SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;
此 ETL 是运行 MySQL 的唯一进程。我已阅读有关死锁发生原因的文档,但我无法理解两个之间没有连接的不同表如何导致死锁。我知道我可以简单地load()
再次运行该方法直到成功,但我想了解为什么会发生死锁以及如何防止它们。
MySQL版本是8.0.21。蟒蛇3.8.4。sqlalchemy 1.3.19。熊猫 1.0.5。PyMySQL 0.10.1。
拉丁的传说
鸿蒙传说
相关分类