共用方式為


在 Apache Spark 上優化寫入的需求

巨量數據處理引擎上的分析工作負載,例如 Apache Spark 在使用標準化較大的檔案大小時,執行效率最高的。 檔案大小、檔案數目、Spark 背景工作角色數目及其組態之間的關聯性,對於效能扮演重要角色。 將數據湖數據表中擷取工作負載,可能會有不斷寫入許多小型檔案的繼承特性:此案例通常稱為「小型檔案問題」。

優化寫入是 Synapse 上的 Delta Lake 功能,可減少寫入的檔案數目,並旨在增加寫入數據的個別檔案大小。 它會以動態方式優化分割區,同時產生預設為128 MB大小的檔案。 目標檔案大小可能會使用 組態來變更每個工作負載需求。

這項功能會透過在分割區上使用額外的數據隨機階段來達到檔案大小,在寫入數據時造成額外的處理成本。 小型寫入的懲罰應該超過數據表的讀取效率。

注意

  • 它適用於 Apache Spark 3.1 以上的 Synapse 集區。

優化寫入的優點

  • 它適用於 Batch 和串流寫入模式的 Delta Lake 數據表。
  • 不需要變更 spark.write 命令模式。 此功能是由組態設定或數據表屬性啟用。
  • 相較於 OPTIMIZE 命令,它會減少寫入交易的數目。
  • OPTIMIZE 作業會更快,因為它會在較少的檔案上運作。
  • 刪除舊未參考檔案的 VACUUM 命令也會更快運作。
  • 查詢會掃描檔案大小較佳的較少檔案,以改善讀取效能或資源使用量。

優化寫入使用案例

使用時機

  • Delta Lake 分割數據表受限於寫入模式,這些模式會產生次佳(小於 128 MB)或非標準檔案大小(本身大小不同的檔案)。
  • 將寫入磁碟且檔案大小次佳的數據框架重新分割。
  • 以 UPDATE、DELETE、MERGE、CREATE TABLE AS SELECT、INSERT INTO 等小型批次 SQL 命令為目標的 Delta Lake 分割數據表。
  • 將數據模式附加至 Delta Lake 數據分割數據表的串流擷取案例,其中額外的寫入延遲是可容忍的。

何時避免

  • 非數據分割數據表。
  • 無法接受額外寫入延遲的使用案例。
  • 具有定義完善的優化排程和讀取模式的大型數據表。

如何啟用和停用優化寫入功能

預設會停用優化寫入功能。 在Spark 3.3集區中,預設會針對分割數據表啟用它。

設定集區或會話的組態之後,所有Spark寫入模式都會使用此功能。

若要使用優化寫入功能,請使用下列組態加以啟用:

  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

若要檢查目前的組態值,請使用 命令,如下所示:

  1. Scala 和 PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`

若要停用優化寫入功能,請變更下列組態,如下所示:

  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

使用數據表屬性控制優化寫入

在新數據表上

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

使用 DeltaTableBuilder API

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

在現有的數據表上

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

使用 DeltaTableBuilder API

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

如何取得及變更優化寫入的目前最大檔案大小組態

若要取得目前的組態值,請使用貝洛命令。 預設值為 128 MB。

  1. Scala 和 PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • 變更組態值
  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

下一步