调用 SQL Server 存储过程来提取 CSV 的 Python 代码需要几个小时才能执行

我使用 python 使用 Pandas 读取 CSV,修复一些字段,然后将数据逐行写入 SQL Server 中的表。服务器上禁用批量导入 - 另外,因为最终会有数十个这样的文件,以自动下载和摄取文件。我可以看到这需要几分钟,但运行需要几个小时。

我知道如果启用的话,我可以在几秒钟内批量上传这些内容,但这可能是不可能的。

问题是使用 python 每次运行可能需要 1 到 3 个小时。这是不可接受的。我想知道是否有更快的方法来完成此上传。我可以对表格做些什么来加快导入速度,或者采用不同的编码方式吗?

这是我正在使用的代码类型的示例:

def ingest_glief_reporting_exceptions_csv():

    global conn

    global cursor

    filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"

    # filename = r"repex_1K.csv"


    full_filename = os.path.join(raw_data_dir, filename)


    sql_str = "exec dbo.util_get_gleif_last_reporting_exception"

    cursor.execute(sql_str)

    last_lei = ''

    for result in cursor.fetchall():

        last_lei = result[0]


    # "repex" is short for "reporting exceptions", shorten the headers

    repex_headers = [

        'LEI',

        'ExceptionCategory',

        'ExceptionReason1',

        'ExceptionReason2',

        'ExceptionReason3',

        'ExceptionReason4',

        'ExceptionReason5',

        'ExceptionReference1',

        'ExceptionReference2',

        'ExceptionReference3',

        'ExceptionReference4',

        'ExceptionReference5'

    ]


    df = pd.read_csv(full_filename, header=0, quotechar='"')


    # Change to the column headers generated in VBA

    df.columns = repex_headers


    for colname in df.columns:

        df[colname] = df[colname].astype(str)

        df[colname] = df[colname].replace({'nan': ''})



    place_holder = '?,?'

    for i in range(1, len(repex_headers)):

        place_holder += ',?'


    sql_str = "exec save_gleif_reporting_exception " + place_holder


    row_count = 0

    row = dict()

    do_not_upload = True

    if last_lei == '':

        do_not_upload = False   # There was no last uploaded record, so we can start now


    for index, row in df.iterrows():

        row_count += 1

        if do_not_upload:

            if row['LEI'] == last_lei:

                do_not_upload = False

                continue

            else:

                continue

        )


有只小跳蛙
浏览 152回答 2
2回答

慕丝7291255

由于您不需要存储过程的返回值,因此您应该能够使用 pandas 的to_sql方法将行直接插入表中。这段代码...from time import timeimport pandas as pdimport sqlalchemy as safrom_engine = sa.create_engine("mssql+pyodbc://@mssqlLocal64")to_engine = sa.create_engine(    "mssql+pyodbc://sa:_whatever_@192.168.0.199/mydb"    "?driver=ODBC+Driver+17+for+SQL+Server",    fast_executemany=False,)# set up testto_cnxn = to_engine.raw_connection()to_cnxn.execute("TRUNCATE TABLE MillionRows")to_cnxn.commit()num_rows_to_upload = 10000df = pd.read_sql_query(    f"SELECT TOP {num_rows_to_upload} "    "[TextField], [LongIntegerField], [DoubleField], [varchar_column] "    "FROM MillionRows ORDER BY ID",    from_engine,)# run testt0 = time()df.to_sql("MillionRows", to_engine, index=False, if_exists="append")s = f"{(time() - t0):0.1f} seconds"print(f"uploading {num_rows_to_upload:,d} rows took {s}")… 表示与您现在所做的工作大致相同的内部工作水平,即,将每个单独的行作为单独的调用上传.execute。结果是uploading 10,000 rows took 60.2 seconds但是,简单地更改to_engine为使用fast_executemany=True结果uploading 10,000 rows took 1.4 seconds

侃侃尔雅

关闭自动提交 conn = pyodbc.connect(        r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \        Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=False) 并在此处和循环结束时提交。    if index % 1000 == 0:            print("Imported %s rows" % (index))使用自动提交,您必须等待日志文件在每一行之后保存到磁盘。为了进一步优化,如果您使用 SQL 2016+,请使用 JSON 将批量行发送到 SQL Server,并使用OPENJSON在服务器端进行解析。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python