猿问

使用 SQLAlchemy 表达式时出现 Dask read_sql_table 错误

我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 一起使用,以便关闭通过连接和过滤几个不同表创建的数据集。该文件表明,这应该是可能的。


(下面的示例不包括任何连接,因为复制问题不需要它们。)


我构建了我的连接字符串,创建了一个 SQLAlchemy 引擎和与我的数据库中的表相对应的表。(我正在使用 PostgreSQL。)


import dask.dataframe as dd

import pandas as pd

from sqlalchemy import create_engine

from sqlalchemy import Column, MetaData, Table

from sqlalchemy.sql import  select



username = 'username'

password = 'password'

server = 'prod'

database = 'my_db'


connection_string = f'postgresql+psycopg2://{username}:{password}@{server}/{database}'


engine = create_engine(connection_string)


metadata = MetaData()


t = Table('my_table', metadata,

    Column('id'),

    schema='my_schema')

我能够构建一个选择并将它与 SQLAlchemy 一起使用,没有问题


>>> s = select([t]).limit(5)

>>> rp = engine.execute(s)

>>> rp.fetchall()


[(3140757,), (3118225,), (3156070,), (3193075,), (3114614,)]

我还可以将 SQLAlchey 选择提供给熊猫的 read_sql,它工作正常


>>> pd.read_sql(s, connection_string)


id

0   3140757

1   3118225

2   3156070

3   3193075

4   3114614

但是,当我将相同的选择传递给 dask 时,我收到了 ProgrammingError。它表明dask正在转身并调用pandas.read_sql,所以你会认为它应该工作,但显然不是。


江户川乱折腾
浏览 251回答 3
3回答

白猪掌柜的

正如 Chris 在不同的答案中所说,Dask 以某种形式包装您的查询SELECT columns FROM (yourquery),这对于 PostgreSQL 来说是无效的语法,因为它需要括号表达式的别名。无需重新实现整个read_sql_table方法,表达式可以简单地通过添加.alias('somename')到您的选择中来别名,即select([t]).limit(5).alias('foo')该表达式,当被 Dask 包装时,为 Postgres 生成正确的语法SELECT columns FROM (yourquery) AS foo

慕婉清6462132

对于遇到此问题的任何其他人。read_sql_table 似乎不支持这个用例(此时)。如果你传入一个 SQLAlchemy Select 对象,它最终会被包裹在另一个 SQLAlchemy Select 中并且没有别名,这是糟糕的 SQL(至少对于 PostgreSQL)。从 dask 源查看 read_sql_table,table 是传递给 read_sql_table 的 Select 对象,正如所见,它被包装在另一个选择中。q = sql.select(columns).where(sql.and_(index >= lower, cond)                              ).select_from(table)好消息是 read_sql_table 函数相对简单,而且魔术实际上只有几行从延迟对象创建数据帧。您只需要编写自己的逻辑即可将查询分成块parts = []for query_chunk in queries:    parts.append(delayed(_read_sql_chunk)(q, uri, meta, **kwargs))return from_delayed(parts, meta, divisions=divisions)def _read_sql_chunk(q, uri, meta, **kwargs):    df = pd.read_sql(q, uri, **kwargs)    if df.empty:        return meta    else:        return df.astype(meta.dtypes.to_dict(), copy=False)

跃然一笑

该行发送的查询是由 SQLAlchemy 自动生成的,因此语法应该是正确的。但是,我注意到您的原始查询包含一个.limit()修饰符。该行的目的head =是获取前几行,以推断类型。如果原始查询已经有一个限制子句,我可以看到两者可能会发生冲突。请尝试使用不带 的查询.limit()。
随时随地看视频慕课网APP

相关分类

Python
我要回答