手记

Databricks与优化后的Snowflake性能大比拼

我是一位 Snowflake 的首席解决方案架构师一职 ,拥有超过 17 年的数据策略、架构和工程经验。在这里提到的观点仅代表我个人,并不一定反映我当前、过往或未来的雇主的观点

在之前的帖子中,Robert Thompson 比较了在 Snowflake 和 Databricks 上运行查询。这是诚心比较两个平台的一次尝试,但大多数“基准测试”中,情况可能并非表面所见。

Databricks 与 Snowflake 的数据对比By Robert Thompsonmedium.com

这篇文章发表于2024年11月,并提到了以下内容:

因为它还没有进入GA阶段,也不适合用于生产环境,因此所有这些测试都没有使用Iceberg。

与其测试Iceberg的GA功能,基准测试使用的是Parquet格式的外部表,这不利用元数据统计信息来取得像Snowflake那样的性能优势,许多人对此已经习惯了。此外,最近的一篇LinkedIn帖子表明,Iceberg表的性能比外部Parquet表高25倍。Iceberg表在开放源数据格式上表现出色的性能,无需借助Snowflake计算资源,就可以用外部引擎进行查询。

为什么不继续罗伯特的工作,看看所谓的Snowflake的Managed Iceberg表与最初公布的结果有何不同?

设置部分

我认为给那些想自己尝试的人提供可重复的步骤很重要。罗伯特给了我他用过的数据链接: https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=azureml-opendatasets。我最初加载了原始数据,但发现列名和数据格式随着时间变化。相反,我启动了一个Databricks集群,并使用笔记本加载了整个数据范围,使用了Azure开放数据集Python库,然后导出了到我的blob存储中。

    # 这是一个预览版的包。  
    # 需要在 Databricks 集群中通过 pip 安装 azureml-opendatasets。 https://learn.microsoft.com/azure/data-explorer/connect-from-databricks#install-the-python-library-on-your-azure-databricks-cluster  

    !pip install azureml-opendatasets  

    from azureml.opendatasets import NycTlcYellow  

    from datetime import datetime  
    from dateutil import parser  

    ## 获取完整的时间范围  
    start_date = parser.parse('2009-01-01')  
    end_date = parser.parse('2018-12-31')  
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)  
    nyc_tlc_df = nyc_tlc.to_spark_dataframe()  

    # 验证是否返回了数据  
    display(nyc_tlc_df.limit(5))  

    ####  
    ## 请根据您的 Blob 存储帐户进行相应的修改  
    ####  
    storage_account_name = "<your_storage_account>"  
    storage_account_access_key = "<your_access_key>"  
    container_name = "<your_container>"  

    # 使用访问密钥来配置 Spark 会话  
    spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_access_key)  

    # 存放 Parquet 文件的路径  
    output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/yellow"  

    # 将 DataFrame 写入 Parquet  
    nyc_tlc_df.write.mode("overwrite").parquet(output_path)

当数据被导出到Azure后,我就能够连接Azure的Snowflake账户(你也可以使用AWS或GCP中的账户)。使用存储集成功能连接了Azure存储账户之后,我为Iceberg用例创建了一个外部表。从那里,我将Azure Blob存储中的Parquet数据加载到了Snowflake Iceberg表结构中。

    --创建一个新的存储集成以连接到blobstore  
    USE ROLE ACCOUNTADMIN;  
    CREATE STORAGE INTEGRATION azure_int  
      TYPE = EXTERNAL_STAGE  
      STORAGE_PROVIDER = 'AZURE'  
      ENABLED = TRUE  
      AZURE_TENANT_ID = '<your Azure Tenant>'  
      STORAGE_ALLOWED_LOCATIONS = ('azure://<storage_account>.blob.core.windows.net/<container>/');  

    GRANT USAGE ON INTEGRATION azure_int to role sysadmin;  

    --我们还需要一个外部卷来写入Iceberg  
    CREATE OR REPLACE EXTERNAL VOLUME AZURE_EXT_VOLUME  
       STORAGE_LOCATIONS =  
        (  
          (  
            NAME = 'azure-iceberg-volume'  
            STORAGE_PROVIDER = 'AZURE'  
            STORAGE_BASE_URL = 'azure://<storage_account>.blob.core.windows.net/<container>/'  
            AZURE_TENANT_ID = '<your Azure Tenant>'  
          )  
        );  
    GRANT USAGE ON EXTERNAL VOLUME AZURE_EXT_VOLUME TO ROLE SYSADMIN;  

    USE ROLE SYSADMIN;   

    --创建一个新的数据库、模式和阶段  
    CREATE DATABASE SNOW_DB;  
    CREATE SCHEMA SNOW_SCHEMA;  
    CREATE STAGE taxi_data_stage  
    URL='azure://<storage_account>.blob.core.windows.net/<container>/yellow'  
    STORAGE_INTEGRATION = azure_int;  

    --创建加载Parquet文件的文件格式  
    CREATE OR REPLACE FILE FORMAT parquet_file_format TYPE = 'parquet'   
    USE_VECTORIZED_SCANNER = TRUE;  

    --创建一个Iceberg表  
    create or replace ICEBERG TABLE NYCTLCYELLOW_IB (  
     "vendorID" VARCHAR(16777216),  
     "tpepPickupDateTime" TIMESTAMP_NTZ(6),  
     "tpepDropoffDateTime" TIMESTAMP_NTZ(6),  
     "passengerCount" NUMBER(38,0),  
     "tripDistance" FLOAT,  
     "puMonth" NUMBER(38,0),  
     "tipAmount" FLOAT,  
     "startLon" FLOAT,  
     "paymentType" VARCHAR(16777216),  
     "endLat" FLOAT,  
     "startLat" FLOAT,  
     "fareAmount" FLOAT,  
     "mtaTax" FLOAT,  
     "improvementSurcharge" VARCHAR(16777216),  
     "endLon" FLOAT,  
     "tollsAmount" FLOAT,  
     "totalAmount" FLOAT,  
     "puLocationId" VARCHAR(16777216),  
     "rateCodeId" NUMBER(38,0),  
     "storeAndFwdFlag" VARCHAR(16777216),  
     "doLocationId" VARCHAR(16777216),  
     "extra" FLOAT,  
     "puYear" NUMBER(38,0)  
    )  
     EXTERNAL_VOLUME = 'AZURE_EXT_VOLUME'  
     CATALOG = 'SNOWFLAKE'  
     BASE_LOCATION = 'NYCTLCYELLOW_IB/';  

    --让我们将数据加载到NYCTLCYELLOW_IB  
    COPY INTO NYCTLCYELLOW_IB  
    FROM @taxi_data_stage  
    FILE_FORMAT = (format_name = PARQUET_FILE_FORMAT)  
    MATCH_BY_COLUMN_NAME = 'CASE_SENSITIVE'  
    PATTERN='.*[.]parquet';  

    --如果您希望优化性能  
    --让我们根据tpepPickupDateTime重新排序数据  
    INSERT OVERWRITE INTO NYCTLCYELLOW_IB  
    SELECT * FROM NYCTLCYELLOW_IB ORDER BY "tpepPickupDateTime";
查询执行情况

我没有记录加载结果的情况,而是关注了查询执行的时间。对于其中两个查询,我重写了它们以利用Snowflake的性能优势,这并没有影响数据的输出。虽然选择查询本身没有改变,但在插入数据之后,我使用了INSERT OVERWRITE INTO ORDER BY子句(如上代码所示),这自然地对表进行了聚类操作。这是一步可选的操作,但也是良好的实践,可以防止数据出现倾斜,并有助于Snowflake优化器高效地进行数据修剪

提供了完整的设置和运行代码,这样其他人可以轻松复现结果。以下是修改后的两个查询。

一般的查询:

    --原查询  
    SELECT "puYear", "puMonth", "totalAmount"  
    FROM (  
        SELECT "puYear", "puMonth", "totalAmount", ROW_NUMBER() OVER (partition by "puYear", "puMonth" order by "totalAmount") as rn  
        FROM nyctlcyellow_ib  
    ) ranked  
    WHERE ranked.rn = 1;  

    --修改后的查询1. 不需要外层的选择。我使用了QUALIFY(筛选条件)  
    SELECT "puYear", "puMonth", "totalAmount", ROW_NUMBER() OVER (partition by "puYear", "puMonth" order by "totalAmount") as rn  
    FROM nyctlcyellow_ib  
    QUALIFY rn=1;  

    --修改后的查询2. 不需要进行排名,直接取最小值!   
    SELECT "puYear", "puMonth", MIN("totalAmount") FROM nyctlcyellow_ib  
    GROUP BY "puYear", "puMonth"  
    ORDER BY 1,2;

荒谬的提问:


    --原始查询在Snowflake上的执行情况。  
    --注意,Snowflake上的查询时间大约是之前所说的50%。  
    SELECT count(*)  
    FROM (  
    SELECT *  
    FROM NYCTLCYELLOW_IB  
    GROUP BY *  
    ) a ;  

    --修改后的查询2。所有行的哈希值唯一计数。  
    SELECT COUNT (DISTINCT HASH(*))  
    FROM NYCTLCYELLOW_IB;

以下是一个代码片段,详细说明了我如何执行这些查询并得到结果。这是一个简单的查询示例工作表。每个查询都运行了三次,并关闭了缓存以避免结果被扭曲。

    --使用 xs WH  
    USE WAREHOUSE COMPUTE_XS_WH;  

    --关闭仓库并禁用结果缓存  
    ALTER SESSION SET USE_CACHED_RESULT = FALSE;  
    ALTER WAREHOUSE COMPUTE_XS_WH SUSPEND;  

    --设置查询标签以从 QH 获取我们的结果  
    ALTER SESSION SET QUERY_TAG = 'Simple:XS';  

    --运行查询  
    SELECT *  
    FROM nyctlcyellow_ib  
    ORDER BY "tpepPickupDateTime" DESC  
    LIMIT 1000;  

    --暂停仓库以避免使用 WH 缓存。等待 2 秒...  
    ALTER WAREHOUSE COMPUTE_XS_WH SUSPEND;  
    SELECT SYSTEM$WAIT(2);  

    --运行查询  
    SELECT *  
    FROM nyctlcyellow_ib  
    ORDER BY "tpepPickupDateTime" DESC  
    LIMIT 1000;  

    --暂停仓库以避免使用 WH 缓存。等待 2 秒...  
    ALTER WAREHOUSE COMPUTE_XS_WH SUSPEND;  
    SELECT SYSTEM$WAIT(2);  

    --运行查询  
    SELECT *  
    FROM nyctlcyellow_ib  
    ORDER BY "tpepPickupDateTime" DESC  
    LIMIT 1000;  

    --取消设置查询标签  
    ALTER SESSION UNSET QUERY_TAG;  

    --使用 Medium 仓库  
    USE WAREHOUSE COMPUTE_M_WH;  

    --设置查询标签以从 QH 获取我们的结果  
    ALTER SESSION SET QUERY_TAG = 'Simple:M';  

    --运行简单查询  
    SELECT *  
    FROM nyctlcyellow_ib  
    ORDER BY "tpepPickupDateTime" DESC  
    LIMIT 1000;  

    --暂停仓库以避免使用 WH 缓存。等待 2 秒...  
    ALTER WAREHOUSE COMPUTE_M_WH SUSPEND;  
    SELECT SYSTEM$WAIT(2);  

    --运行查询  
    SELECT *  
    FROM nyctlcyellow_ib  
    ORDER BY "tpepPickupDateTime" DESC  
    LIMIT 1000;  

    --暂停仓库以避免使用 WH 缓存。等待 2 秒...  
    ALTER WAREHOUSE COMPUTE_M_WH SUSPEND;  
    SELECT SYSTEM$WAIT(2);  

    --运行查询  
    SELECT *  
    FROM nyctlcyellow_ib  
    ORDER BY "tpepPickupDateTime" DESC  
    LIMIT 1000;  

    --取消设置查询标签  
    ALTER SESSION UNSET QUERY_TAG;  

    --完成运行后,您可以通过程序化方式获取查询时间和成本的结果  
    --根据之前的博客文章,我使用了 $3 的信用成本  
    select QUERY_TAG, AVG(TOTAL_ELAPSED_TIME / 1000) TIME_IN_SEC, COUNT(*),  
    CASE WHEN ANY_VALUE(WAREHOUSE_SIZE) = 'X-Small' then 0.0003 WHEN ANY_VALUE(WAREHOUSE_SIZE) = 'Medium' THEN 0.0011 END * 3 CREDIT_COST,  
    TIME_IN_SEC * CREDIT_COST QUERY_COST  
    from SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY  
    WHERE QUERY_TAG LIKE '%:%'  
    AND QUERY_TYPE = 'SELECT'  
    AND QUERY_TEXT <> 'SELECT SYSTEM$WAIT(2);'  
    AND EXECUTION_STATUS = 'SUCCESS'  
    AND WAREHOUSE_SIZE IS NOT NULL  
    AND START_TIME > CURRENT_DATE - 2  
    GROUP BY QUERY_TAG  
    ORDER BY 1;
成果

这些数字看起来大不一样!简单的查询和选择性查询没有被改动,在Snowflake上使用Iceberg后,这些查询的速度快了大约15倍。

中等和荒诞的查询与未修改的 Databricks 大致相同。此外,对于荒诞的查询,我使用 Iceberg 获得的结果比之前少了大约一半,而且没有任何改动。修改后的查询则便宜了惊人的 3 到 4 倍!

这是结果如下。

最后

尽管前一篇文章没有真正理解如何优化 Snowflake 就试图诚实地比较这些引擎。经过一些小的调整,一些结果减少了 80%。在优化查询的情况下,使用 Iceberg 管理表的 Snowflake 不仅速度更快,而且成本更低,比 Databricks 更有优势。

我的几点主要收获如下:

  1. 优化您的平台工作负载。 知道如何编写高效的SQL可以节省大量成本——微小的调整会产生巨大的影响。另外,了解数据加载的最佳实践,如排序数据,使选择查询在分区裁剪的帮助下执行得更快。
  2. 跟上新特性。 Snowflake平台不断变化。跟上最新功能可以为您节省时间和金钱——例如Iceberg、动态和混合表等“新”特性提供了许多以前不存在的好处。
  3. 了解您的用例的业务价值。 引擎的价值不仅体现在查询成本上。每个平台都提供独特的功能,可以帮助您的业务获取见解并减少实现价值的时间。一个能够迅速交付结果的平台非常重要,但这只有这样,它才算符合您的业务需求和时间表。例如,在一个平台上交付数据产品可能需要6周,而在另一个平台上只需要6天?此示例是完全虚构的——请务必测试您的数据和需求,以确保它们满足您的业务目标。
0人推荐
随时随地看视频
慕课网APP