查询流式处理数据
可以使用 Azure Databricks 通过结构化流查询流式处理数据源。 Azure Databricks 为 Python 和 Scala 中的流式处理工作负载提供广泛的支持,并支持使用 SQL 的大多数结构化流式处理功能。
以下示例演示如何在笔记本中交互式开发期间使用内存接收器手动检查流式处理数据。 由于笔记本 UI 中的行输出限制,可能无法观察流式处理查询读取的所有数据。 在生产工作负荷中,应仅通过将流式处理查询写入目标表或外部系统来触发流式处理查询。
注意
对流式处理数据的交互式查询的 SQL 支持仅限于在全用途计算上运行的笔记本。 在 Databricks SQL 或 Delta 实时表中声明流式处理表时,也可以使用 SQL。 请参阅 在 Databricks SQL 中使用流式处理表加载数据和什么是 Delta 实时表?。
向流式处理系统查询数据
Azure Databricks 为以下流式处理系统提供流数据读取器:
- Kafka
- Kinesis
- PubSub
- Pulsar
针对这些系统初始化查询时,必须提供配置详细信息,具体取决于配置的环境和你选择从中读取的系统。 请参阅配置流式处理数据源。
涉及流式系统的常见工作负载包括将数据引入到湖屋中,并进行流处理以将数据传输到外部系统。 有关流式处理工作负载的详细信息,请参阅 Azure Databricks 上的流式处理。
以下示例演示从 Kafka 读取的交互式流式处理:
Python
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
将表查询为流式处理读取
默认情况下,Azure Databricks 使用 Delta Lake 创建所有表。 对 Delta 表执行流式处理查询时,该查询会在提交表的版本时自动选取新记录。 默认情况下,流式处理查询要求源表仅包含追加的记录。 如果需要处理包含更新和删除的流数据,Databricks 建议使用 Delta 实时表和 APPLY CHANGES INTO
。 请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获。
以下示例演示如何执行从表读取的交互式流式处理:
Python
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
使用自动加载程序查询云对象存储中的数据
可以使用 Azure Databricks 云数据连接器自动加载程序从云对象存储流式传输数据。 可以将连接器与存储在 Unity Catalog 卷或其他云对象存储位置中的文件配合使用。 Databricks 建议使用卷来管理对云对象存储中的数据的访问权限。 请参阅连接到数据源。
Azure Databricks 会优化此连接器,以便在云对象存储中流式引入数据,这些数据存储在常用的结构化、半结构化和非结构化格式中。 Databricks 建议以近乎原始的格式存储引入的数据,以最大程度地提高吞吐量,并最大程度地减少由于记录或架构更改而导致的潜在数据丢失。
有关从云对象存储引入数据的更多建议,请参阅将数据引入到 Databricks 湖屋中。
以下示例演示从卷中的 JSON 文件的目录读取的交互式流式处理:
Python
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')