使用 SQL 转换数据

已完成

借助提供 DataFrame 结构的 SparkSQL 库,你还可以使用 SQL 作为处理数据的一种方式。 通过此方法,你可使用 SQL 查询来查询和转换 DataFrame 中的数据,并将结果保存为表。

注意

表是基于文件的元数据抽象。 关系表中不存储数据,但该表基于 Data Lake 中的文件提供了一个关系层。

定义表和视图

Spark 中的表定义存储在元存储中,后者是一个用于基于文件封装关系抽象的元数据层。 外部表是元存储中的关系表,用于引用指定的 Data Lake 位置中的文件。 可通过查询表或直接从 Data Lake 中读取文件来访问此数据。

注意

外部表与基础文件“松散绑定”,删除表不会删除文件。 这样,你就可以使用 Spark 来完成繁重的转换工作,然后将数据保存在 Data Lake 中。 完成此操作后,可以删除表,并且下游进程可以访问这些优化的结构。 还可以定义托管表,其中的基础数据文件存储在与元存储关联的内部管理存储位置。 托管表与文件“紧密绑定”,删除托管表将删除关联的文件。

以下代码示例将 DataFrame(从 CSV 文件加载)保存为外部表名称 sales_orders。 这些文件存储在 Data Lake 的 /sales_orders_table 文件夹中。

order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')

使用 SQL 查询和转换数据

定义表后,可以使用 SQL 查询和转换其数据。 以下代码创建了两个新的派生列(“年份”和“月份”),然后创建了一个添加了新派生列的新表 transformed_orders。

# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")

# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')

新表的数据文件存储在文件夹层次结构中,其格式为 Year=*NNNN* / Month=*N*,其中每个文件夹都包含按年份和月份排列相应订单的 parquet 文件。

查询元存储

由于此新表是在元存储中创建的,因此你可以通过第一行中的 %%sql 魔钥使用 SQL 对其进行查询,以指示将使用 SQL 语法,如下面的脚本所示:

%%sql

SELECT * FROM transformed_orders
WHERE Year = 2021
    AND Month = 1

删除表

使用外部表时,可以使用 DROP 命令从元存储中删除表定义,而不会影响 Data Lake 中的文件。 利用此方法,你可以在使用 SQL 转换数据后清理元存储,同时使转换后的数据文件可用于下游数据分析和引入过程。

%%sql

DROP TABLE transformed_orders;
DROP TABLE sales_orders;