处理 Spark 数据帧中的数据
Spark 原本使用一种称为弹性分布式数据集 (RDD) 的数据结构;但是虽然可以编写直接处理 RDD 的代码,但在 Spark 中处理结构化数据最常用的数据结构是数据帧,它作为 Spark SQL 库的一部分提供。 Spark 中的数据帧类似于通用 Pandas Python 库中的数据帧,但经过优化,可以在 Spark 的分布式处理环境中工作。
注意
除了 Dataframe API,Spark SQL 还提供了 Java 和 Scala 支持的强类型 Dataset API。 在本模块中,我们将重点介绍 Dataframe API。
将数据加载到数据帧中
我们来看看一个假设示例,了解如何使用数据帧来处理数据。 假设你在湖屋的“文件/数据”文件夹中名为 products.csv 的以逗号分隔的文本文件中有以下数据:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
推断架构
在 Spark 笔记本中,可以使用以下 PySpark 代码将文件数据加载到数据帧中并显示前 10 行:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
开头的 %%pyspark
行称为 magic,它告诉 Spark 此单元格中使用的语言是 PySpark。 可以在 Notebook 界面的工具栏中选择要用作默认语言的语言,然后使用 magic 替代特定单元格的默认语言选择。 例如,下面是产品数据示例的等效 Scala 代码:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
magic %%spark
用于指定 Scala。
这两个代码示例都会产生如下输出:
ProductID | ProductName | 类别 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
772 | Mountain-100 Silver, 42 | 山地自行车 | 3399.9900 |
773 | Mountain-100 Silver, 44 | 山地自行车 | 3399.9900 |
... | ... | ... | ... |
指定显式架构
在前面的示例中,CSV 文件的第一行包含列名,Spark 能够根据每一列所包含的数据推断出其数据类型。 还可以指定数据的显式架构,这在数据文件中不包含列名时很有用,例如此 CSV 示例:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
以下 PySpark 示例演示了如何指定要从名为 product-data.csv 的文件加载的数据帧的架构,格式如下:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
结果将再次类似于以下内容:
ProductID | ProductName | 类别 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
772 | Mountain-100 Silver, 42 | 山地自行车 | 3399.9900 |
773 | Mountain-100 Silver, 44 | 山地自行车 | 3399.9900 |
... | ... | ... | ... |
提示
指定显式架构还可以提高性能!
对数据帧进行筛选和分组
可以使用 Dataframe 类的方法来对所包含的数据进行筛选、排序、分组和执行其他操作。 例如,以下代码示例使用 select 方法从包含前面示例中的产品数据的 df 数据帧中检索 ProductID 和 ListPrice 列:
pricelist_df = df.select("ProductID", "ListPrice")
此示例代码的结果可能如下所示:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
与大多数数据操作方法一样,select 返回一个新的数据帧对象。
提示
从数据帧中选择部分列是一种常见的操作,也可以通过使用以下较短的语法来实现:
pricelist_df = df["ProductID", "ListPrice"]
可以将多个方法“链接”在一起来执行一系列操作,从而生成转换后的数据帧。 例如,此示例代码将 select 和 where 方法链接在一起来创建新的数据帧,其中包含用于“山地自行车”或“公路自行车”类别的产品的 ProductName 和 ListPrice 列:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
此示例代码的结果可能如下所示:
ProductName | 类别 | ListPrice |
---|---|---|
Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
Road-750 黑色,52 | 公路自行车 | 539.9900 |
... | ... | ... |
要对数据进行分组和聚合,可以使用 groupBy 方法和聚合函数。 例如,以下 PySpark 代码计算每个类别的产品数量:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
此示例代码的结果可能如下所示:
类别 | count |
---|---|
耳机 | 3 |
车轮 | 14 |
山地自行车 | 32 |
... | ... |
保存数据帧
通常需要使用 Spark 来转换原始数据并保存结果,以便进一步分析或进行下游处理。 以下代码示例将数据帧保存到数据湖中的 Parquet 文件中,替换任何同名的现有文件。
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
注意
对于用于进一步分析或引入到分析存储的数据文件,通常首选 Parquet 格式。 Parquet 是一种非常高效的格式,大多数大规模数据分析系统都支持这种格式。 事实上,有时数据转换要求可能只是将数据从其他格式(如 CSV)转换为 Parquet!
对输出文件进行分区
分区是一项优化技术,使 Spark 能够最大限度地提高工作器节点的性能。 在查询中筛选数据时,通过消除不必要的磁盘 IO 可以实现更高的性能提升。
要将数据帧另存为一组已分区的文件,可在写入数据时使用 partitionBy 方法。 以下示例保存 bikes_df 数据帧(其中包含 mountain bikes 和 road bikes 类别的产品数据),并按类别对数据进行分区:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
对数据帧进行分区时生成的文件夹名称包含格式为“列=值”的分区列名称和值,因此,此代码示例创建一个名为 bike_data 的文件夹,其中包含以下子文件夹:
- Category=Mountain Bikes
- Category=Road Bikes
每个子文件夹都包含一个或多个 parquet 文件,其中包含相应类别的产品数据。
注意
可按多个列对数据进行分区,这会为每个分区键生成一个文件夹层次结构。 例如,你可以按年份和月份对销售订单数据进行分区,这样一来,在文件夹层次结构中,每个年份值都有一个文件夹,而在这些年份文件夹中,每个月份值都有一个子文件夹。
加载已分区的数据
将已分区的数据读入数据帧时,可以通过为已分区的字段指定显式值或通配符,从层次结构中的任何文件夹加载数据。 以下示例加载 Road Bikes 类别中产品的数据 :
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
注意
生成的数据帧会省略文件路径中指定的分区依据列。 该示例查询生成的结果不包括 Category 列 - 所有行的类别都将是 Road Bikes。