共用方式為


結構化串流的生產考量

本文章包含使用 Azure Databricks 上的工作排程結構化串流工作負載的建議。

Databricks 建議一律執行下列動作:

  • 將筆記本中不必要的程式碼移除,這類程式碼會返回結果,例如 displaycount
  • 請勿使用所有用途計算來執行結構化串流工作負載。 一律使用工作計算將串流排程為工作。
  • 使用 Continuous 模式排程工作。
  • 請勿針對結構化串流工作啟用計算的自動調整。

某些工作負載受益於下列各項:

Azure Databricks 引進了 Delta Live Tables,以減少管理結構化串流工作負載生產基礎結構的複雜性。 Databricks 建議針對新的結構化串流管線使用 Delta Live Tables。 請參閱什麼是差異即時資料表?

注意

在縮小結構化串流工作負載的叢集大小時,計算自動調整有其限制。 Databricks 建議針對串流工作負載使用差異即時資料表與增強型自動調整。 請參閱 使用增強型自動調整來優化差異實時數據表管線的叢集使用率。

為串流工作負載設計預期失敗的能力

Databricks 建議一律設定串流工作,以在失敗時自動重新啟動。 某些功能,包括結構描述演進,假設結構化串流工作負載已設定為自動重試。 請參閱 <設定結構化串流工作,以在失敗時重新啟動串流查詢>。

某些運算,例如 foreachBatch 提供至少一次,而不是確切一次的保證。 針對這些運算,您應該讓處理管線具有等冪。 請參閱<使用 foreachBatch 寫入任意資料接收器>。

注意

當查詢重新啟動時,會處理上一次執行期間計劃的微批次。 如果您的工作因記憶體不足錯誤而失敗,或因超大型的微批次而手動取消工作,您可能需要相應擴大計算,才能成功處理微批次。

如果您變更執行之間的組態,這些設定會套用至第一個新批次方案。 請參閱 <在結構化串流查詢變更之後的復原>。

工作何時重試?

您可以將多個工作排程為 Azure Databricks 工作的一部分。 當您使用連續觸發設定工作時,無法設定工作之間的相依性。

您可以選擇使用下列其中一種方法,在單一工作中排程多個串流:

  • 多個工作:定義具有使用連續觸發執行串流工作負載之多個工作的工作。
  • 多個查詢:在單一工作的原始程式碼中定義多個串流查詢。

您也可以合併這些策略。 下個資料表將比較這兩種方法。

多項工作 多個查詢
如何共用計算? Databricks 建議將工作的計算大小適當地部署到每個串流工作。 您可以選用跨工作共用計算。 所有查詢都會共用相同的計算。 您可以選用將查詢指派給 排程者集區
重試如何處理? 所有工作都必須在工作重試之前失敗。 如果有任何查詢失敗,工作會重試。

設定結構化串流工作以在失敗時重新啟動串流查詢

Databricks 建議使用連續觸發設定所有串流工作負載。 請參閱持續執行作業

連續觸發預設會提供下列行為:

  • 防止多個並行執行工作。
  • 在上一次執行失敗時開始新的執行。
  • 使用指數輪詢重試。

Databricks 建議在排程工作流程時一律使用工作計算,而不是所有用途的計算。 在工作失敗並重試時,會部署新的計算資源。

注意

您不需要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 當串流查詢處於作用中狀態時,工作會自動防止執行完成。

針對多個串流查詢使用排程者集區

您可以從相同的原始程式碼執行多個串流查詢時,設定排程集區將計算容量指派給查詢。

根據預設,筆記本中開始的所有查詢都會在相同的公平排程集區中執行。 來自筆記本中所有串流查詢的觸發所產生的 Apache Spark 工作會以「先進先出」(FIFO) 順序逐一執行。 這可能會導致查詢中不必要的延遲,因為它們無法有效率地共用叢集資源。

排程者集區允許您宣告哪些結構化串流查詢共用計算資源。

下列範例將 query1 指派至專用集區,而 query2query3 則共用排程者集區。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

區域屬性組態必須位於您開始串流查詢相同的筆記本資料欄中。

如需其他詳細資料,請參閱 <Apache 公平排程者文件>。