结构化流式处理的生产注意事项
本文包含有关在 Azure Databricks 上使用作业来计划结构化流式处理工作负载的建议。
Databricks 建议始终执行以下操作:
- 从返回结果的笔记本中删除不必要的代码,例如
display
和count
。 - 不要使用通用计算运行结构化流式处理工作负载。 始终使用作业计算将流作为作业进行计划。
- 使用
Continuous
模式计划作业。 - 不要为结构化流式处理作业启用计算自动缩放。
某些工作负载会受益于以下功能:
Azure Databricks 引入了增量实时表,以降低管理结构化流式处理工作负载的生产基础结构的复杂性。 Databricks 建议将增量实时表用于新的结构化流式处理管道。 请参阅什么是增量实时表?。
注意
计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议将增量实时表与增强式自动调整用于流式处理工作负载。 请参阅使用增强型自动缩放来优化 Delta Live Tables 管道的群集利用率。
设计流式处理工作负载来预期失败
Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)假定结构化流式处理工作负载已配置为自动重试。 将结构化流式处理作业配置为在失败时重启流式处理查询。
某些操作(例如 foreachBatch
)提供至少一次而不是恰好一次保证)。 对于这些操作,应确保处理管道是幂等的。 请参阅使用 foreachBatch 将内容写入到任意数据接收器。
注意
当查询重启时,将会处理在之前运行中计划的微批处理。 如果作业因内存不足错误而失败,或者由于超大型微批处理而手动取消了作业,则可能需要纵向扩展计算才能成功处理该微批处理。
如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复。
作业何时重试?
可以将多个任务计划为一个 Azure Databricks 作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。
可以选择使用以下任意一种方法在单个作业中计划多个流:
- 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
- 多查询:在单个任务的源代码中定义多个流式处理查询。
还可以组合使用这些策略。 下表比较了这些方法。
多个任务 | 多查询 | |
---|---|---|
如何共享计算? | Databricks 建议将大小适当的作业计算部署到每个流式处理任务。 可以选择跨任务共享计算。 | 所有查询共享相同的计算。 可以选择将查询分配给计划程序池。 |
如何处理重试? | 必须在所有任务失败之后才能进行作业重试。 | 如果任何查询失败,任务将会重试。 |
将结构化流式处理作业配置为在失败时重启流式处理查询
Databricks 建议使用连续触发器配置所有流式处理工作负载。 请参阅连续运行作业。
连续触发器默认提供以下行为:
- 阻止超过一个并发作业运行。
- 在上一次运行失败时启动新的运行。
- 对重试使用指数退避。
Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。
注意
不需要使用 streamingQuery.awaitTermination()
或 spark.streams.awaitAnyTermination()
。 当流式处理查询处于活动状态时,作业会自动防止运行完成。
将计划程序池用于多个流式处理查询
可以通过配置计划程序池,以便在从同一源代码运行多个流式处理查询时将计算容量分配给查询。
默认情况下,笔记本中启动的所有查询都在同一公平计划池中运行。 由触发器根据笔记本中的所有流式处理查询生成的 Apache Spark 作业将按照“先进先出”(FIFO) 的顺序依次运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。
通过计划程序池,可声明哪些结构化流式处理查询共享计算资源。
以下示例将 query1
分配给专用池,而 query2
和 query3
共享一个计划程序池。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
注意
本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。
有关更多详细信息,请参阅 Apache 公平计划程序文档。