手记

在Python笔记本中探索PySpark、Pandas、DuckDB、Polars和DataFusion的数据操作技巧

数据工程师和科学家常常使用各种工具来处理不同类型的数据操作——从大规模的分布式处理到内存中的数据操作。alexmerced/spark35nb Docker镜像使这个过程变得更简单,因为它提供了一个预设环境,在这个环境中,你可以尝试多种流行的数据工具,包括PySpark、Pandas、DuckDB、Polars和DataFusion。

在这个博客里,我们将一步步教您如何搭建这个环境,并演示如何使用这些工具执行基本的数据处理,例如写入数据、加载数据以及执行查询和聚合操作。不管是处理大型数据集,还是操作小型内存数据,您会发现这些不同库之间如何相互配合。

第1节:搭建您的开发环境
1.1 拉取 Docker 镜像

要开始,请先从Docker Hub拉取alexmerced/spark35nb这个Docker镜像。这个镜像自带一套预配置的环境,其中包含了Spark 3.5.2、JupyterLab,还有许多常用的数据处理库,比如Pandas、DuckDB和Polars。

执行以下命令来拉取图像:

运行以下命令来拉取镜像:`docker pull alexmerced/spark35nb`

接下来,使用以下命令来启动容器:

你可以运行这个命令来启动容器并将这些端口映射到主机:

docker run -p 8888:8888 -p 4040:4040 -p 7077:7077 -p 8080:8080 -p 18080:18080 -p 6066:6066 -p 7078:7078 -p 8081:8081 alexmerced/spark35nb

一旦容器启动并开始运行,打开您的浏览器并在其中输入 localhost:8888 来访问 JupyterLab,在那里您就可以进行所有数据相关的操作了。

现在你已经设置好环境,我们可以开始用PySpark、Pandas、DuckDB、Polars和DataFusion做一些基本的数据操作了。

第二部分:使用PySpark进行数据分析(一种基于Python的Spark接口)
2.1 什么是PySpark呢?

PySpark 是 Apache Spark 的 Python API,Apache Spark 是一个开源引擎,旨在进行大规模数据处理和分布式计算任务。它允许你将数据和计算任务分布在集群上,从而处理大规模数据。虽然 Spark 通常在分布式集群中运行,不过这种设置也允许你在单个节点上本地运行——非常适合开发和测试环境。

使用 PySpark,你可以处理数据、执行 SQL 查询、进行机器学习等,所有这些操作都在一个能够高效处理大数据的框架中完成。在这一部分,我们将介绍如何在 JupyterLab 环境中使用 PySpark 来写入及查询数据。

2.2 用 PySpark 写入数据:

让我们从在PySpark中创建一个简单的数据集开始吧。首先,初始化一个Spark会话来使用Spark功能。我们将创建一个包含示例数据的小DataFrame并展示它。

    从pyspark.sql导入SparkSession作为spark  

    # 初始化Spark会话:PySpark 示例程序  
    spark = SparkSession.builder.appName("PySpark 示例程序").getOrCreate()  
    # 样例数据:包含姓名和年龄的元组列表作为样例数据  
    data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]  
    # 创建名为df的DataFrame  
    df = spark.createDataFrame(data, ["姓名", "年龄"])  
    # 显示df  
    df.show()

在这个例子中,我们创建了一个带有三行数据的DataFrame,分别代表人们的姓名和年龄。通过df.show()可以查看DataFrame的内容,让我们轻松检查刚刚创建的数据。

2.3 用 PySpark 加载和查询数据

首先,让我们从文件中加载数据集并运行一些基本的查询。PySpark可以处理各种文件格式,比如CSV、JSON和Parquet。

对于这个例子,假设我们有一个包含更多关于人员信息的CSV文件,我们将这个CSV文件加载到一个DataFrame中。然后我们将展示一个简单的筛选查询和聚合操作来统计每个年龄段的人数。

    # 将CSV文件加载到DataFrame中  
    df_csv = spark.read.csv("data/people.csv", header=True, inferSchema=True)  

    # 显示DataFrame的前几行数据  
    df_csv.show()  

    # 筛选出年龄大于30的人  
    df_filtered = df_csv.filter(df_csv["Age"] > 30)  

    # 显示筛选后的DataFrame  
    df_filtered.show()  

    # 按年龄分组并统计每个年龄组的人数  
    df_grouped = df_csv.groupBy("Age").count()  

    # 显示分组后的结果  
    df_grouped.show()

在这个示例中,我们使用 spark.read.csv() 将一个 CSV 文件加载了。然后,我们进行了两种不同的操作:如下,这些操作与加载的 CSV 文件相关。

  • 筛选:我们只筛选出 DataFrame 中年龄大于 30 的行。
  • 汇总:我们按年龄分组并统计每个年龄段的人数。

使用PySpark,你可以在大规模数据集中执行更复杂的查询和聚合,使其成为处理大数据的强大工具。

接下来,我们将来看看 Pandas,它非常适合处理小规模的内存数据,不需要进行分布式处理。

第三:用Pandas操作数据
3.1 Pandas是什么?

Pandas 是最常用的 Python 库之一,用于数据操作和分析。它提供了诸如 DataFrame 之类易于使用的数据结构,让您直观地处理表格数据。与 PySpark 不同,后者是为大规模分布式数据处理设计的,Pandas 在内存中操作,非常适合处理小型到中型数据集。

使用“Pandas”,你可以从各种格式(如 CSV、Excel 和 JSON)读取和写入各种格式的数据,并使用简单易懂的语法执行常见的数据操作,如过滤、聚合及合并数据。

使用Pandas加载数据

让我们从将一个数据集加载到Pandas DataFrame中开始。比如说,我们将读取一个CSV文件,这是一种常用的数据文件格式,常用于数据存储,并查看前几行数据。

    import pandas as pd  

    # 从 CSV 文件加载数据到 Pandas DataFrame  
    df_pandas = pd.read_csv("data/people.csv")  

    # 打印数据框的前五行记录  
    print(df_pandas.head())

在这个例子中,我们使用 pd.read_csv() 读取了 people.csv 文件,并将其加载到 Pandas DataFrame 中。head() 方法可以显示 DataFrame 的前几行,这对于快速检查数据非常有用。

3.3 基本操作 with Pandas

现在我们已经加载了数据。让我们执行一些基本操作,例如筛选行和对数据进行分组。Pandas 让你可以使用简单的 Python 语法轻松地执行这些操作。

    # 过滤数据,只显示年龄大于30的人  
    df_filtered = df_pandas[df_pandas["Age"] > 30]  

    # 打印(df_filtered)  
    print(df_filtered)  

    # 按 'Age' 分组并计算每个年龄的人数  
    df_grouped = df_pandas.groupby("Age").count()  

    # 按 'Age' 分组并计算每个年龄组的人数  
    df_grouped = df_pandas.groupby("Age").count()  

    # 打印(df_grouped)  
    print(df_grouped)

在这里,我们使用了一个简单的布尔条件来筛选数据,只筛选了30岁以上的人。然后,我们使用了groupby()函数按年龄进行分组,并统计了每个年龄组的人数。

Pandas在内存数据操作方面极其高效,使其成为适合放在内存中的小数据集的理想工具。在接下来的部分里,我们将探讨DuckDB,这是一个基于SQL的工具,可以快速查询内存中的数据。

第四节:看看DuckDB
4.1 DuckDB是什么?

DuckDB 是一种内存中的 SQL 数据库管理系统(DBMS),专门设计用于分析工作负载。它允许您在 Python 环境中高效地直接查询数据集,提供高性能。DuckDB 特别适合对结构化数据(如 CSV 或 Parquet 文件)执行复杂的 SQL 查询,而无需额外设置单独的数据库服务器。

DuckDB 虽轻量却功能强大,可以用作 SQLite 这类工具的替代品,特别是在对大型数据集进行分析查询时。

4.2 将数据写进DuckDB

DuckDB可以很容易地与Pandas结合使用,让您可以把Pandas DataFrame中的数据导入DuckDB进行SQL查询。下面是如何使用Pandas数据在DuckDB中创建表。

    import duckdb  

    # 连接到内存中的 DuckDB 数据库实例  
    conn = duckdb.connect()  

    # 将 Pandas DataFrame 转换为 DuckDB 中的表  
    conn.execute("CREATE TABLE people AS SELECT * FROM df_pandas")  

    # 查看 'people' 表的数据  
    conn.execute("SELECT * FROM people").df()

在这个例子中,我们连接到了DuckDB,并利用Pandas DataFrame df_pandas创建了一个新的表people。DuckDB的execute()函数允许你运行SQL命令,使通过SQL查询来操作数据变得非常简单。

4.3 查询 DuckDB 数据

一旦您的数据被加载到DuckDB中,您就可以运行SQL查询来过滤、聚合以及分析您的数据。DuckDB支持一系列广泛的SQL功能,非常适合那些更喜欢使用SQL而非Python来操作数据的用户。

    # 查询年龄超过30岁的人  
    result = conn.execute("SELECT Name, Age FROM people WHERE Age > 30").df()  

    # 显示查询结果  
    print(result)  

    # 按年龄分组并统计每个年龄组的人数  
    result_grouped = conn.execute("SELECT Age, COUNT(*) as count FROM people GROUP BY Age").df()  

    # 显示分组后的结果  
    print(result_grouped)

在这个例子中,我们使用SQL筛选了people表,只选择了那些大于30岁的人。接下来我们对数据进行了分组查询,统计每个年龄段的人数情况。

当您需要在 Python 环境中直接使用类似 SQL 的功能时,DuckDB 是一个很好的选择。无需设置和管理数据库服务器,您就可以利用 SQL 的强大功能。接下来,我们将探讨 Polars,一个以速度快和效率高著称的数据框库。

第五部分:使用Polars进行快速数据框操作
5.1 什么是Polars?

Polars 是一个高性能数据操作的 DataFrame 库,专门用于快速和高效的数据处理,因其速度快和效率高而闻名,特别是在与类似 Pandas 的库相比时。Polars 是用 Rust 编写的,并采用了一个优化的查询引擎,能够迅速处理大型数据集,并且内存占用很少。它还提供了与 Pandas 类似的接口,使得学习和将其集成到现有的 Python 工作流中变得简单。

Polars非常适合处理难以完全加载到内存中的大规模数据集,或者在性能至关重要的情况下。

5.2 与 Polars 一起工作

让我们从一个Python字典开始,创建一个Polars DataFrame。然后我们将做一些基本的操作,比如过滤和聚合数据。

    import polars as pl  

    # 创建一个名为Polars的DataFrame  
    df_polars = pl.DataFrame({  
        "Name": ["Alice", "Bob", "Catherine"],  
        "Age": [34, 45, 29]  
    })  
    # 打印这个Polars DataFrame  
    print(df_polars)

在这个例子中,我们使用一个Python字典创建了一个DataFrame(数据框)。语法与Pandas类似,但是操作进行了速度优化。Polars提供了延迟执行,这意味着它可以同时优化多个操作,节省计算时间。

5.3 使用 Polars 进行筛选和汇总

现在,让我们做一些常用的数据操作,比如筛选和汇总数据。这些操作在Polars里做了优化,可以使用简单明了的语法来完成。

    # 筛选 DataFrame,只保留年龄大于30的记录  
    df_filtered = df_polars.filter(pl.col("Age") > 30)  

    # 打印筛选后的 DataFrame  
    print(df_filtered)  

    # 按 'Age' 分组并计数每个年龄组中的条目数  
    df_grouped = df_polars.groupby("Age").count()  

    # 打印分组结果  
    print(df_grouped)

在这个例子中,具体来说,我们筛选了数据,只显示年龄大于30的行,然后我们按年龄分组数据,统计每个年龄组的人数。得益于Polars的优化内存管理和查询执行引擎,这些操作因此非常高效。

Polars 是一个非常适合处理大小数据集的 DataFrame 库,当性能是关键要求时。接下来,我们将探索 DataFusion,这是一个用于基于 SQL 查询处理 Apache Arrow 数据的工具。

第6节:DataFusion在查询执行中的应用
6.1 DataFusion 是什么?

DataFusion 是一个基于 Apache Arrow(一种高效的数据分析列式内存格式)的内存查询执行引擎,专门用于在内存中执行查询。它提供了一个强大的 SQL 引擎,允许用户对存储在 Arrow 格式中的结构化数据执行复杂的查询。DataFusion 属于 Apache Arrow 生态系统,旨在为不同的数据处理工具提供快速的数据互操作性。

DataFusion特别适合对大型内存数据集进行SQL查询,而无需传统数据库的额外开销。它与Arrow的集成确保数据处理既快速又节省内存。

使用DataFusion进行数据写入和查询

DataFusion 允许你在内存数据上执行 SQL 查询,利用 Apache Arrow 技术。我们先用 DataFusion 创建一个 DataFrame,然后对它执行几个 SQL 查询。

    from datafusion 导入 SessionContext  

    # 初始化一个 DataFusion 会话  
    ctx = SessionContext()  

    # 用一些数据创建一个 DataFrame  
    data = [  
        {"Name": "Alice", "Age": 34},  
        {"Name": "Bob", "Age": 45},  
        {"Name": "Catherine", "Age": 29}  
    ]  

    # 将 DataFrame 注册为表  
    df = ctx.create_dataframe(data)  
    ctx.register_table("people", df)  

    # 查询数据,选择年龄大于 30 的人  
    result = ctx.sql("SELECT Name, Age FROM people WHERE Age > 30").collect()  

    # 打印结果  
    print(result)

在这个例子中,我们使用 DataFusion 的会话上下文创建了一个 DataFrame 并将其注册成一张表。然后我们执行了一个简单的 SQL 查询来筛选出年龄超过 30 的人。DataFusion 让你能够结合使用 SQL 的强大功能和 Apache Arrow 内存格式带来的高速和效率。

使用DataFusion进行数据聚合

和在 DuckDB 中一样,我们可以通过聚合查询操作按特定字段对数据进行分组并统计每个组中的记录数。在 DataFusion 中,让我们看看这如何实现。

    # 按 'Age' 分组并统计每个年龄段的人数
    result_grouped = ctx.sql("SELECT Age, COUNT(*) as count FROM people GROUP BY Age").collect()  

    # 显示分组结果
    print(result_grouped)

在这次查询中,我们按“Age”列对数据进行了分组处理,并统计了每个年龄组的人数情况。DataFusion的SQL执行引擎确保即使是内存中存储的大数据集,查询也能运行得非常高效。

DataFusion 是一款非常适合需要快速查询大内存数据集的用户的工具,它基于 SQL,并能充分利用 Apache Arrow 的高性能列式数据格式。特别适合用来构建需要大量查询结构化数据的分析管道系统。

Bonus 部分:将 Python 与 Dremio 集成:
Dremio是什么呢?

Dremio 是一个强大的数据湖仓平台,帮助组织整合和查询来自各种来源的数据。它使用户能够轻松地管理和加速处理他们的数据,而无需昂贵且复杂的数仓基础设施。Dremio 能够直接从 Apache Iceberg、Delta Lake、S3、RDBMS 和 JSON 文件等格式的数据访问和查询数据,加上其性能优化,减轻了传统数据仓库的工作量。

Dremio 是建立在 Apache Arrow 之上的,Apache Arrow 是一种高性能的列式内存数据格式,并通过 Arrow Flight 加速大数据集在网上的传输。这种集成提供了闪电般快速的查询响应,同时支持各种分析工具之间的互操作性。

在本节中,我们将演示如何在 Docker 容器中启动 Dremio,并使用 Python 和 dremio-simple-query 查询 Dremio 数据源。

六.一 使用 Docker 部署 Dremio

要在本地机器上运行Dremio,请使用下面的Docker命令:

    docker run -p 9047:9047 -p 31010:31010 -p 45678:45678 -p 32010:32010 -e DREMIO_JAVA_SERVER_EXTRA_OPTS=-Dpaths.dist=file:///opt/dremio/data/dist --name try-dremio dremio/dremio-oss

当 Dremio 正常运行后,您可以在浏览器中访问 http://localhost:9047 来进入 Dremio 用户界面。在这里,您可以进行设置数据源、创建虚拟数据表和探索平台的各种功能操作。

查询Dremio 6.2:通过dremio-simple 查询使用Python查询Dremio

dremio-simple-query库允许您使用Apache Arrow Flight查询Dremio,提供了高性能的接口来从Dremio源获取和分析数据。借助此库,可以轻松将Dremio查询转换为Pandas、Polars或DuckDB DataFrames,或者直接操作Apache Arrow数据。

这里是如何开始的:

第一步:安装必要的库

确保你已经安装了 dremio-simple-query 库(它已经预先安装在 alexmerced/spark35nb 镜像中)。你可以使用 pip 安装它。

    pip install dremio-simple-query

这将安装名为 dremio-simple-query 的 Python 包

第 2 步:连接到 Dremio

你需要你的Dremio凭证来获取一个令牌并建立连接。这里有一个简单的例子

    从 dremio_simple_query.connect 导入 get_token, DremioConnection 以便与中文编程习惯更加一致。  
    从 os 导入 getenv  
    从 dotenv 导入 load_dotenv  

    注释:加载环境变量 (TOKEN 和 ARROW_ENDPOINT,确保它们已正确设置) 以增加一些提示信息。  
    load_dotenv()  

    注释:登录 Dremio 并获取 token  
    login_endpoint = "http://{host}:9047/apiv2/login"  
    payload = {  
        "userName": "your_username",  
        "password": "your_password"  
    }  
    token = get_token(uri=login_endpoint, payload=payload)  

    注释:请确保使用正确的 host,以匹配你的 Dremio 实例  
    arrow_endpoint = "grpc://{host}:32010"  

    注释:使用 Arrow Flight 建立与 Dremio 的连接  
    dremio = DremioConnection(token, arrow_endpoint)

如果你是在本地用 docker run 命令运行这个程序,主机应该是 Dremio 容器在 Docker 网络中的 IP 地址,你可以通过运行 docker inspect 命令来找到它。

在这段代码里,我们使用 get_token 函数从 Dremio 的 REST API 获取认证 token,并连接至 Dremio 的 Arrow Flight 服务端点。

步骤三:查询 Dremio 并以各种格式获取数据

连接之后,你可以通过该连接以不同格式查询 Dremio 并获取结果,包括 Arrow、Pandas、Polars 和 DuckDB。下面是如何操作的:

查询数据并以 Apache Arrow 表形式返回:
    # 查询 Dremio 并获取数据作为 Apache Arrow 表  
    stream = dremio.toArrow("SELECT * FROM my_table;")
    arrow_table = stream.read_all()

    # 打印 Arrow 表
    print(arrow_table)
转换为数据框:
    # 将查询结果打印出来
    df = dremio.toPandas("查询 Dremio 并将结果作为 Pandas DataFrame 返回")
    print(df)

(Note: The second line of the comment should directly precede the code, not replace the function call in the code. Corrected version:)

    # 将查询结果打印出来
    df = dremio.toPandas("SELECT * FROM my_table;")
    print(df)
将数据转换为 Polars DataFrame:(注:Polars 是一个高性能的数据处理库)
    # 从 Dremio 获取数据并转换为 Polars DataFrame  
    df_polars = dremio.toPolars("SELECT * FROM my_table;")
    print(df_polars)
要试试用DuckDB查询:
    # 查询 Dremio 并将其作为 DuckDB 关系返回  
    duck_rel = dremio.toDuckDB("SELECT * FROM my_table")  

    # 执行对 DuckDB 关系的查询  
    result = duck_rel.query("my_table", "SELECT * FROM my_table WHERE Age > 30").fetchall()  

    # 打印结果  
    print(result)

使用 dremio-simple-query 库,您可以高效地从 Dremio 查询大型数据集,从而立即使用 Pandas、Polars 和 DuckDB 等各种工具进行分析,无需额外配置,同时利用高性能的底层 Apache Arrow 格式。

6.3 为什么使用Dremio呢?

Dremio 提供了若干优势,使其成为你数据栈中的一大强援。

  • 管理:集中管理所有数据源,确保合规并保持控制。
  • 数据联邦:将来自不同来源(如 Iceberg、Delta Lake、JSON、CSV 和关系数据库)的数据整合,而不移动数据。
  • 性能:借助 Dremio 的查询加速功能和 Apache Arrow Flight,提升查询速度。
  • 成本节省:通过卸载传统数据仓库的工作负载,Dremio 可以降低基础设施成本。

这确保了您的查询既快又高效。它还使您能够将各种数据源和工具无缝集成到分析工作流程中。

结尾

在这篇博客里,我们介绍了如何使用多种强大的工具来处理数据。从alexmerced/spark35nb Docker镜像开始,我们演示了如何设置一个包含PySpark、Pandas、DuckDB、Polars和DataFusion的开发环境,每个工具都针对不同的数据处理需求进行了优化设计。我们展示了如何利用每个工具的独特优势来进行基本的数据操作,例如写入、查询和聚合数据。

  • PySpark 支持大规模数据集的可扩展分布式处理能力,非常适合处理大数据环境中的任务。
  • Pandas 提供了易于操作的内存中的数据处理功能,适用于较小数据集的处理,是快速数据探索的首选工具。
  • DuckDB 提供了高效的内存中 SQL 引擎,非常适合进行无需复杂基础架构支持的分析查询。
  • Polars 带来了闪电般快速的数据帧操作,结合了性能和简洁性,适用于更大或性能至关重要的数据集。
  • DataFusion 基于 Apache Arrow,能够提供高性能的 SQL 查询,特别适合内存中的分析任务。

最后,我们介绍了Dremio,它与Apache Arrow集成,以实现在各种数据源间进行超快速的查询。通过简单的Dremio查询库,Dremio允许分析师快速获取并使用如Pandas、Polars和DuckDB等工具分析数据,确保数据在需要时随时随地可用,而无需传统数据仓库的额外负担。

无论你是处理小型数据集还是在分布式环境中应对海量数据,这种配置提供了一个灵活、高效且可扩展的平台,适用于任何数据工程或数据科学项目。通过结合使用这些工具,你可以从数据探索到大规模分析,覆盖整个数据处理过程,以最少的设置获得最佳性能。

0人推荐
随时随地看视频
慕课网APP