修改并保存数据帧

已完成

Apache Spark 提供数据帧对象作为处理数据的主要结构。 可使用数据帧来查询和转换数据,并将结果保存在数据湖中。 要将数据加载到数据帧中,可使用 spark.read 函数,指定要读取的数据的文件格式、路径和(可选)架构。 例如,以下代码将 orders 文件夹中所有 .csv 文件中的数据加载到名为 order_details 的数据帧中,然后显示前五条记录。

order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))

转换数据结构

将源数据加载到数据帧后,可使用数据帧对象的方法和 Spark 函数对其进行转换。 数据帧上的典型操作包括:

  • 筛选行和列
  • 重命名列
  • 创建新列,通常派生自现有列
  • 替换 null 或其他值

在下面的示例中,代码使用 split 函数将 CustomerName 列中的值分隔为名为 FirstName 和 LastName 的两个新列。 然后使用 drop 方法删除原始 CustomerName 列。

from pyspark.sql.functions import split, col

# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")

display(transformed_df.limit(5))

你可使用 Spark SQL 库的完备功能通过筛选行、派生、删除、重命名列以及应用其他所需的数据修改来转换数据。

保存转换后的数据

数据帧处于所需结构后,可将结果保存为数据湖中受支持的格式。

以下代码示例将数据帧保存到数据湖中的 Parquet 文件中,替换任何同名的现有文件。

transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")

注意

对于用于进一步分析或引入到分析存储的数据文件,通常首选 Parquet 格式。 Parquet 是一种非常高效的格式,大多数大规模数据分析系统都支持这种格式。 事实上,有时数据转换要求可能只是将数据从其他格式(如 CSV)转换为 Parquet!