使用 Spark SQL 处理数据

已完成

Dataframe API 是名为 Spark SQL 的 Spark 库的一部分,它使数据分析师能够使用 SQL 表达式来查询和操作数据。

在 Spark 目录中创建数据库对象

Spark 目录是关系数据对象(例如视图和表)的元存储。 Spark 运行时可以使用目录将用任何 Spark 支持的语言编写的代码与 SQL 表达式无缝集成,对于一些数据分析师或开发人员来说,SQL 表达式可能更合理。

使数据帧中的数据可用于在 Spark 目录中查询的最简单方法之一是创建一个临时视图,如以下代码示例所示:

df.createOrReplaceTempView("products_view")

视图是临时的,这意味着它会在当前会话结束时被自动删除。 还可以创建持久保存在目录中的表,以定义可以使用 Spark SQL 查询的数据库。

表是元数据结构,该结构会将其基础数据存储在与目录关联的存储位置。 在 Microsoft Fabric 中,托管表的数据存储在数据湖中显示的“表”存储位置中,使用 Spark 创建的表都在此处列出。

可使用 spark.catalog.createTable 方法创建空表,也可使用其 saveAsTable 方法将数据帧另存为表。 删除托管表也会删除其基础数据。

例如,以下代码将数据帧保存为名为 products 的新表:

df.write.format("delta").saveAsTable("products")

注意

Spark 目录支持基于各种格式的文件的表。 Microsoft Fabric 中的首选格式是 delta,它是 Spark 上名为 Delta Lake 的关系数据技术的格式。 Delta 表支持关系数据库系统中常见的功能,包括事务、版本控制和流式处理数据支持。

此外,可使用 spark.catalog.createExternalTable 方法创建外部表。 外部表定义目录中的元数据,但从外部存储位置获取其基础数据;通常是数据湖的“文件”存储区域中的文件夹。 删除外部表不会删除基础数据。

提示

就像上一单元中就 parquet 文件探讨的那样,可以对 delta lake 表应用相同的分区技术。 通过对表进行分区,可在查询时获得更好的性能。

使用 Spark SQL API 查询数据

可以使用采用任何语言编写的代码中的 Spark SQL API 来查询目录中的数据。 例如,以下 PySpark 代码使用 SQL 查询将 products 表中的数据作为数据帧返回。

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

代码示例的结果类似于下表:

ProductID ProductName ListPrice
771 Mountain-100 Silver, 38 3399.9900
839 Road-750 黑色,52 539.9900
... ... ...

使用 SQL 代码

前面的示例演示了如何使用 Spark SQL API 在 Spark 代码中嵌入 SQL 表达式。 在笔记本中,还可以使用 %%sql magic 来运行查询目录中的对象的 SQL 代码,如下所示:

%%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL 代码示例返回一个结果集,该结果集在笔记本中自动显示为表:

类别 ProductCount
骑行背带短裤 3
自行车车架 1
单车存放架 1
... ...