使用 SQL 开发管道代码
Delta Live Tables 引入了多个新的 SQL 关键字和函数,用于在管道中定义具体化视图和流式处理表。 SQL 对开发管道的支持基于 Spark SQL 的基础知识,并添加了对结构化流式处理功能的支持。
熟悉 PySpark DataFrame 的用户可能更喜欢使用 Python 开发管道代码。 Python 支持更复杂的测试和操作,这些测试和操作难以通过 SQL 实现,例如元编程操作。 请参阅 使用 Python 开发管道代码。
有关 Delta Live Tables SQL 语法的完整参考,请参阅 Delta Live Tables SQL 语言参考。
用于管道开发的 SQL 基础知识
创建 Delta Live Tables 数据集的 SQL 代码使用 CREATE OR REFRESH
语法根据查询结果定义具体化视图和流式处理表。
关键字 STREAM
指示是否应使用流式处理语义读取子句中 SELECT
引用的数据源。
Delta Live Tables 源代码与 SQL 脚本严重不同:Delta Live Tables 在管道中配置的所有源代码文件中评估所有数据集定义,并在运行任何查询之前生成数据流图。 笔记本或脚本中显示的查询顺序不定义执行顺序。
使用 SQL 创建具体化视图
下面的代码示例演示了使用 SQL 创建具体化视图的基本语法:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
使用 SQL 创建流式处理表
下面的代码示例演示了使用 SQL 创建流式处理表的基本语法:
注意
并非所有数据源都支持流式读取,某些数据源应始终使用流式处理语义进行处理。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
从对象存储加载数据
增量实时表支持从 Azure Databricks 所支持的所有格式加载数据。 请参阅数据格式选项。
注意
这些示例使用自动装载到工作区中的数据 /databricks-datasets
。 Databricks 建议使用卷路径或云 URI 来引用存储在云对象存储中的数据。 请参阅什么是 Unity Catalog 卷?。
Databricks 建议在针对存储在云对象存储中的数据配置增量引入工作负荷时使用自动加载器和流式处理表。 请参阅什么是自动加载程序?。
SQL 使用该 read_files
函数调用自动加载程序功能。 还必须使用STREAM
关键字来配置流式读取。read_files
以下示例使用自动加载程序从 JSON 文件创建流式处理表:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
该 read_files
函数还支持批处理语义来创建具体化视图。 以下示例使用批处理语义读取 JSON 目录并创建具体化视图:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
使用预期验证数据
可以使用预期来设置和强制实施数据质量约束。 请参阅使用 Delta Live Tables 管理数据质量。
以下代码定义了一个预期,该期望将 valid_data
删除在数据引入过程中为 null 的记录:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
查询管道中定义的具体化视图和流式处理表
使用 LIVE
架构查询管道中定义的其他具体化视图和流式处理表。
以下示例定义四个数据集:
- 一个用于
orders
加载 JSON 数据的流式处理表。 - 一个名为
customers
加载 CSV 数据的具体化视图。 - 一个具体化视图,用于
customer_orders
联接来自和数据集的customers
orders
记录,将顺序时间戳强制转换为日期,然后选择customer_id
“、order_number
state
”和order_date
“字段”。 - 一个具体化视图,用于
daily_orders_by_state
聚合每个状态的每日订单计数。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;