使用 Spark 結構化串流將資料串流至 Lakehouse
結構化串流是基於 Spark 構建的可調整且容錯的串流處理引擎。 Spark 會負責累加且持續地執行串流作業,因為資料會繼續送達。
結構化串流在Spark 2.2中可供使用。 自那以後,它一直是資料流的建議方法。 結構化資料流背後的基本原則是將即時資料流視為一律持續附加新資料的資料表,就像資料表中的新資料列一樣。 有一些定義的內建串流檔案來源,例如 CSV、JSON、ORC、Parquet,以及 Kafka 和事件中樞等傳訊服務的內建支援。
本文提供如何透過具有高輸送量的生產環境中Spark結構化串流,將事件的處理和擷取優化見解。 建議的方法包括:
- 資料串流輸送量最佳化
- 最佳化差異資料表中的寫入作業和
- 事件批處理
Spark 作業定義和 Spark 筆記本
Spark 筆記本是驗證想法並執行實驗以從您的資料或程式碼取得見解的絕佳工具。 筆記本廣泛用於資料準備、視覺效果、機器學習和其他巨量資料案例中。 Spark 作業定義是長時間在Spark叢集上執行的非互動式程式碼導向工作。 Spark 作業定義提供強固性和可用性。
Spark 筆記本是測試程式碼邏輯並解決所有商務需求的絕佳來源。 不過,為了在生產案例中保持執行,已啟用重試原則的Spark作業定義是最佳解決方案。
Spark 作業定義的重試原則
在 Microsoft Fabric 中,使用者可以設定 Spark 作業定義的重試原則。 雖然作業中的腳本可能無限,但執行腳本的基礎結構可能會產生需要停止作業的問題。 或者作業可能會因為基礎結構修補需求而消除。 重試原則可讓使用者在因任何基礎問題而停止時,設定自動重新啟動作業的規則。 參數會指定應該重新啟動作業的頻率、最多無限次重試,以及設定重試之間的時間。 如此一來,使用者就可以確保其Spark作業定義工作會無限地繼續執行,直到使用者決定停止它們為止。
串流來源
使用事件中樞設定串流需要基本設定,包括事件中樞命名空間名稱、中樞名稱、共用存取密鑰名稱和取用者群組。 取用者群組可讓您檢視整個事件中樞。 它可讓多個取用的應用程式個別檢視事件數據流,並以自己的步調和位移獨立讀取數據流。
分割區是能夠處理大量資料的重要部分。 單一處理器每秒處理事件的容量有限,而多個處理器可以在平行執行時執行更好的作業。 資料分割允許平行處理大量事件的可能性。
如果太多分割區搭配低擷取速率使用,分割讀取器會處理此資料的一小部分,而導致處理不理想的處理。 理想的分割區數目會直接取決於所需的處理速率。 如果您想要調整事件處理規模,請考慮新增更多分割區。 分割區上沒有特定的輸送量限制。 不過,命名空間中的彙總輸送量受限於輸送量單位數目。 當您增加命名空間中的輸送量單位數目時,您可能想要額外的分割區允許並行讀取器達到最大輸送量。
建議調查及測試輸送量案例的最佳分割區數目。 但是,使用32個以上的分割區來查看高輸送量的案例很常見。
Azure 事件中樞 Connector for Apache Spark (azure-event-hubs-spark) 建議將 Spark 應用程式連線至 Azure 事件中樞。
Lakehouse 作為串流接收器
Delta Lake 是一個開放原始碼儲存層,在資料湖儲存解決方案之上提供 ACID(原子性、一致性、隔離性和持久性)異動。 Delta Lake 也支援可調整的中繼資料處理、架構演進、時間移動(資料版本控制)、開放格式和其他功能。
在網狀架構 資料工程師 中,Delta Lake 可用來:
- 使用 Spark SQL 輕鬆地向上插入(插入/更新)和刪除資料。
- 壓縮資料,以將查詢資料所花費的時間降到最低。
- 在執行作業前後檢視資料表的狀態。
- 擷取資料表上執行的作業歷程記錄。
Delta 會新增為 writeStream 中使用的其中一種可能的輸出接收格式。 有關現有輸出接收器的更多資訊,請參閱 Spark 結構化串流程式設計指南。
下列範例示範如何將資料串流至 Delta Lake。
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
關於範例中已擷選的程式碼:
- format() 是定義資料輸出格式的指令。
- outputMode() 會定義資料流中新資料列的寫入方式(也就是附加、覆寫)。
- toTable() 會將串流資料保存在使用傳遞為參數的值所建立的 Delta 資料表中。
最佳化差異寫入
資料分割是建立健全串流解決方案的重要部分:資料分割可改善資料的組織方式,也會改善輸送量。 差異作業之後,檔案很容易分散,導致太多小型檔案。 由於在磁碟上寫入檔案的時間很長,所以檔案太大也是個問題。 資料分割的挑戰是尋找適當的平衡,以產生最佳的檔案大小。 Spark 支援在儲存體和磁碟上進行分割。 適當分割的資料可以在將資料保存到 Delta Lake 並從 Delta Lake 查詢資料時,提供最佳效能。
- 在磁碟上分割資料時,您可以選擇如何使用 partitionBy() 根據資料行來分割資料。 partitionBy() 是一個函式,用來根據寫入磁碟時提供的一或多個資料行,將大型語意模型分割成較小的檔案。 資料分割是使用大型語意模型時改善查詢效能的方法。 避免選擇產生太小或太大資料分割的資料行。 根據具有良好基數的資料行集來定義資料分割,並將資料分割成大小最佳的檔案。
- 您可以使用 repartition() 或 coalesce(),在儲存體中分割資料,在多個背景工作節點上散發資料,以及建立多個工作,以使用彈性分散式資料集 (RDD) 的基本概念平行讀取和處理資料。 它允許將語意模型分割成邏輯分割區,這可以在叢集的不同節點上計算。
- repartition() 用來增加或減少儲存體中的資料分割數目。 重新分割會透過網路重新調整整個資料,並在所有分割區之間平衡。
- coalesce() 僅用於有效率地減少分割區數目。 這是重新分割的 最佳化版本, 其中所有資料分割的資料移動會使用coalesce() 較低。
在具有高輸送量的案例中,結合這兩種資料分割方法是很好的解決方案。 repartition() 會在儲存體中建立特定數目的資料分割,而 partitionBy() 會將檔案寫入每個儲存體分割區和分割資料行的磁碟。 下列範例說明相同 Spark 作業中這兩個分割策略的使用方式:資料會先分割成儲存體中的 48 個分割區(假設我們總共有 48 個 CPU 核心),然後根據承載中兩個現有資料行的磁碟分割。
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
最佳化寫入
將寫入最佳化至 Delta Lake 的另一個選項是使用最佳化寫入。 最佳化寫入是選擇性功能,可改善將資料寫入 Delta 資料表的方式。 Spark 會在寫入資料之前合併或分割資料分割,將寫入磁碟的資料輸送量最大化。 不過,它會產生完整的隨機顯示,因此對於某些工作負載,可能會導致效能降低。 使用 coalesce() 和/或 repartition() 分割磁碟上資料的作業可以重構為改用 Optimized Write 開始。
下列程式碼是使用 Optimized Write 的範例。 請注意,仍然使用 partitionBy()。
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
批處理事件
為了將作業數目降至最低,以改善將資料擷取到 Delta Lake 所花費的時間,批處理事件是實用的替代方案。
觸發程式會定義串流查詢應該執行的頻率(觸發)併發出新的數據。 設定它們會定義微盤的定期處理時間間隔,將數據和批處理事件累積到少數保存作業中,而不是一直寫入磁碟。
下列範例顯示串流查詢,其中事件會定期以一分鐘的間隔處理。
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
在 Delta 資料表寫入作業中合併事件批處理的優點是,它會建立較大的 Delta 檔案,並在其中建立更多資料,以避免小型檔案。 您應該分析所擷取的資料量,並尋找最佳處理時間,以最佳化 Delta 連結庫所建立的 Parquet 檔案大小。
監視
Spark 3.1 和更新版本具有內建 結構化串流 UI ,其中包含下列串流計量:
- 輸入速率
- 處理序速率
- 輸入資料列
- 批次持續時間
- 操作持續時間