在 Databricks SQL 中使用流式处理表加载数据
Databricks 建议使用流式处理表通过 Databricks SQL 引入数据。 流式处理表是一种注册到 Unity Catalog 的表,额外支持流式处理或增量数据处理。 系统会自动为每个流式处理表创建一个增量事实表管道。 可以使用流式处理表从 Kafka 和云对象存储进行增量数据加载。
本文演示如何使用流式处理表从配置为 Unity Catalog 卷(建议)或外部位置的云对象存储加载数据。
注意
若要了解如何使用 Delta Lake 表作为流式处理源和接收器,请参阅 Delta 表流式处理读取和写入。
重要
Databricks SQL 中创建的流式处理表由无服务器增量实时表管道提供支持。 工作区必须支持无服务器管道才能使用此功能。
开始之前的准备工作
在开始之前,必须满足以下要求。
工作区要求:
- 启用了无服务器的 Azure Databricks 帐户。 有关详细信息,请参阅启用无服务器 SQL 仓库。
- 一个启用了 Unity Catalog 的工作区。 有关详细信息,请参阅设置和管理 Unity Catalog。
计算要求:
你必须使用下列项之一:
使用
Current
通道的 SQL 仓库。Databricks Runtime 13.3 LTS 或更高版本上共享访问模式的计算。
Databricks Runtime 15.4 LTS 或更高版本上具有单用户访问模式的计算。
在 Databricks Runtime 15.3 及更低版本上,不能使用单个用户计算来查询其他用户拥有的流式处理表。 仅当拥有流式处理表时,才能在 Databricks Runtime 15.3 及更低版本上使用单用户计算。 表的创建者是所有者。
Databricks Runtime 15.4 LTS 及更高版本支持在单个用户计算上对增量实时表生成的表进行查询,而不考虑表所有权。 若要利用 Databricks Runtime 15.4 LTS 及更高版本中提供的数据筛选,你必须确认你的工作区是否已启用无服务器计算,因为支持增量实时表生成的表的数据筛选功能在无服务器计算上运行。 使用单个用户计算运行数据筛选操作时,可能会为无服务器计算资源付费。 请参阅单用户计算上的精细访问控制。
权限要求:
- 对 Unity Catalog 外部位置的
READ FILES
特权。 有关详细信息,请参阅创建外部位置以将云存储连接到 Azure Databricks。 - 对在其中创建流式处理表的目录的
USE CATALOG
特权。 - 对在其中创建流式处理表的架构的
USE SCHEMA
特权。 - 对在其中创建流式处理表的架构的
CREATE TABLE
特权。
其他要求:
源数据的路径。
卷路径示例:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
外部位置路径示例:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
注意
本文假设要加载的数据位于云存储位置,该位置对应于你有权访问的 Unity Catalog 卷或外部位置。
发现和预览源数据
在工作区的边栏中,单击“查询”,然后单击“创建查询”。
在查询编辑器中,从下拉列表中选择使用该
Current
通道的 SQL 仓库。将以下内容粘贴到编辑器中,将尖括号 (
<>
) 中的值替换为标识源数据的信息,然后单击“运行”。注意
如果函数的默认值无法分析数据,则在运行
read_files
表值函数时可能会遇到架构推理错误。 例如,可能需要为多行 CSV 或 JSON 文件配置多行模式。 有关分析程序选项的列表,请参阅 read_files 表值函数。/* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
将数据加载到流式处理表中
若要用云对象存储中的数据创建流式处理表,请将以下内容粘贴到查询编辑器中,然后单击“运行”:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
设置运行时通道
使用 SQL 仓库创建的流式处理表使用增量实时表管道自动刷新。 默认情况下,增量实时表管道使用通道中的 current
运行时。 请参阅 Delta Live Tables 发行说明和发布升级过程 ,了解发布过程。
Databricks 建议将 current
通道用于生产工作负荷。 新功能首先发布到 preview
频道。 可以通过指定 preview
为表属性,将管道设置为预览增量实时表通道以测试新功能。 可以在创建表时或使用 ALTER 语句创建表后指定此属性。
下面的代码示例演示如何在 CREATE 语句中将通道设置为预览:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
## <a id="refresh"></a> Refresh a <st> using a DLT pipeline
This section describes patterns for refreshing a <st> with the latest data available from the sources defined in the query.
When you `CREATE` or `REFRESH` a <st>, the update processes using a serverless <DLT> pipeline. Each <st> you define has an associated <DLT> pipeline.
After you run the `REFRESH` command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.
.. note:: Only the table owner can refresh a <st> to get the latest data. The user that creates the table is the owner, and the owner can't be changed. You might need to refresh your <st> before using [time travel](/delta/history.md#time-travel) queries.
See [_](/delta-live-tables/index.md).
### Ingest new data only
By default, the `read_files` function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, set the `includeExistingFiles` option to `false`. This means that only data that arrives in the directory after table creation is processed. For example:
.. azure::
```sql
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
完全刷新流式处理表
完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。
例如:
REFRESH STREAMING TABLE my_bronze_table FULL
为流式处理表计划自动刷新
若要将流式处理表配置为根据定义的计划自动刷新,请将以下内容粘贴到查询编辑器中,然后单击“运行”:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
有关刷新计划查询的示例,请参阅 ALTER STREAMING TABLE。
跟踪刷新状态
可以在 Delta Live Tables UI 中查看管理流式处理表的管道或查看 DESCRIBE EXTENDED
命令为流式处理表返回的刷新信息,以此查看流式处理表的刷新状态。
DESCRIBE EXTENDED <table-name>
从 Kafka 流式引入
有关从 Kafka 流式引入的示例,请参阅 read_kafka。
授予用户对流式处理表的访问权限
若要授予用户对流式处理表的 SELECT
特权,以便他们可以对其进行查询,请将以下内容粘贴到查询编辑器中,然后单击“运行”:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
有关授予 Unity Catalog 安全对象特权的详细信息,请参阅 Unity Catalog 特权和安全对象。
使用查询历史记录监视运行
可以使用查询历史记录页访问查询详细信息和查询配置文件,这些查询配置文件可帮助你识别用于运行流式处理表更新的 Delta 实时表管道中性能不佳的查询和瓶颈。 有关查询历史记录和查询配置文件中可用的信息的概述,请参阅 查询历史记录 和 查询配置文件。
重要
此功能目前以公共预览版提供。 工作区管理员可以从“预览版”页启用此功能。 请参阅管理 Azure Databricks 预览版。
与流式处理表相关的所有语句都显示在查询历史记录中。 可以使用 语句 下拉列表筛选器来选择任何命令并检查相关查询。 所有 CREATE
语句后跟在 REFRESH
增量实时表管道上异步执行的语句。 这些 REFRESH
语句通常包括详细的查询计划,用于提供优化性能的见解。
若要访问 REFRESH
查询历史记录 UI 中的语句,请使用以下步骤:
- 单击 左侧栏中以打开 查询历史记录 UI。
- 从“语句”下拉列表筛选器中选择“REFRESH”复选框。
- 单击查询语句的名称可查看摘要详细信息,例如查询持续时间和聚合指标。
- 单击“查看查询配置文件”打开查询配置文件。 有关导航查询配置文件的详细信息,请参阅 查询配置文件 。
- (可选)可以使用“查询源”部分中的链接打开相关的查询或管道。
还可以使用 SQL 编辑器中的链接或附加到 SQL 仓库的笔记本访问查询详细信息。
注意
流式处理表必须配置为使用 预览 通道运行。 请参阅 “设置运行时通道”。