結構化串流的生產考量
本文章包含使用 Azure Databricks 上的工作排程結構化串流工作負載的建議。
Databricks 建議一律執行下列動作:
- 將筆記本中不必要的程式碼移除,這類程式碼會返回結果,例如
display
和count
。 - 請勿使用所有用途計算來執行結構化串流工作負載。 一律使用工作計算將串流排程為工作。
- 使用
Continuous
模式排程工作。 - 請勿針對結構化串流工作啟用計算的自動調整。
某些工作負載受益於下列各項:
Azure Databricks 引進了 Delta Live Tables,以減少管理結構化串流工作負載生產基礎結構的複雜性。 Databricks 建議針對新的結構化串流管線使用 Delta Live Tables。 請參閱什麼是差異即時資料表?。
注意
在縮小結構化串流工作負載的叢集大小時,計算自動調整有其限制。 Databricks 建議針對串流工作負載使用差異即時資料表與增強型自動調整。 請參閱 使用增強型自動調整來優化差異實時數據表管線的叢集使用率。
為串流工作負載設計預期失敗的能力
Databricks 建議一律設定串流工作,以在失敗時自動重新啟動。 某些功能,包括結構描述演進,假設結構化串流工作負載已設定為自動重試。 請參閱 <設定結構化串流工作,以在失敗時重新啟動串流查詢>。
某些運算,例如 foreachBatch
提供至少一次,而不是確切一次的保證。 針對這些運算,您應該讓處理管線具有等冪。 請參閱<使用 foreachBatch 寫入任意資料接收器>。
注意
當查詢重新啟動時,會處理上一次執行期間計劃的微批次。 如果您的工作因記憶體不足錯誤而失敗,或因超大型的微批次而手動取消工作,您可能需要相應擴大計算,才能成功處理微批次。
如果您變更執行之間的組態,這些設定會套用至第一個新批次方案。 請參閱 <在結構化串流查詢變更之後的復原>。
工作何時重試?
您可以將多個工作排程為 Azure Databricks 工作的一部分。 當您使用連續觸發設定工作時,無法設定工作之間的相依性。
您可以選擇使用下列其中一種方法,在單一工作中排程多個串流:
- 多個工作:定義具有使用連續觸發執行串流工作負載之多個工作的工作。
- 多個查詢:在單一工作的原始程式碼中定義多個串流查詢。
您也可以合併這些策略。 下個資料表將比較這兩種方法。
多項工作 | 多個查詢 | |
---|---|---|
如何共用計算? | Databricks 建議將工作的計算大小適當地部署到每個串流工作。 您可以選用跨工作共用計算。 | 所有查詢都會共用相同的計算。 您可以選用將查詢指派給 排程者集區。 |
重試如何處理? | 所有工作都必須在工作重試之前失敗。 | 如果有任何查詢失敗,工作會重試。 |
設定結構化串流工作以在失敗時重新啟動串流查詢
Databricks 建議使用連續觸發設定所有串流工作負載。 請參閱持續執行作業。
連續觸發預設會提供下列行為:
- 防止多個並行執行工作。
- 在上一次執行失敗時開始新的執行。
- 使用指數輪詢重試。
Databricks 建議在排程工作流程時一律使用工作計算,而不是所有用途的計算。 在工作失敗並重試時,會部署新的計算資源。
注意
您不需要使用 streamingQuery.awaitTermination()
或 spark.streams.awaitAnyTermination()
。 當串流查詢處於作用中狀態時,工作會自動防止執行完成。
針對多個串流查詢使用排程者集區
您可以從相同的原始程式碼執行多個串流查詢時,設定排程集區將計算容量指派給查詢。
根據預設,筆記本中開始的所有查詢都會在相同的公平排程集區中執行。 來自筆記本中所有串流查詢的觸發所產生的 Apache Spark 工作會以「先進先出」(FIFO) 順序逐一執行。 這可能會導致查詢中不必要的延遲,因為它們無法有效率地共用叢集資源。
排程者集區允許您宣告哪些結構化串流查詢共用計算資源。
下列範例將 query1
指派至專用集區,而 query2
與 query3
則共用排程者集區。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
注意
區域屬性組態必須位於您開始串流查詢相同的筆記本資料欄中。
如需其他詳細資料,請參閱 <Apache 公平排程者文件>。