使用增量实时表转换数据
本文介绍如何使用增量实时表声明数据集的转换,并指定如何通过查询逻辑处理记录。 其中还包含常用转换模式的示例,用于生成增量实时表管道。
可以针对返回数据帧的任何查询定义数据集。 可以使用 Apache Spark 内置操作、UDF、自定义逻辑和 MLflow 模型作为增量实时表管道中的转换。 将数据引入增量实时表管道后,可以针对上游源定义新的数据集,以创建新的流式表、具体化视图和视图。
如要了解如何使用 Delta Live Tables 有效地执行有状态处理,请参阅使用水印优化 Delta Live Tables 中的有状态处理。
何时使用视图、具体化视图和流式表
实现管道查询时,请选择最佳数据集类型以确保管道高效且可维护。
请考虑使用视图来执行以下操作:
- 将所需的大型或复杂查询分解为更易于管理的查询。
- 使用期望来验证中间结果。
- 减少不需要保留的结果的存储和计算成本。 由于表已具体化,因此它们需要额外的计算和存储资源。
在以下情况下考虑使用具体化视图:
- 多个下游查询使用表。 由于视图是按需计算的,因此每次查询视图时都会重新计算视图。
- 其他管道、作业或查询将使用表。 由于视图未具体化,因此你只能在同一个管道中使用它们。
- 你想要在开发过程中查看查询结果。 由于表已具体化并可以在管道外部查看和查询,因此在开发过程中使用表可帮助验证计算的正确性。 验证后,将不需要具体化的查询转换为视图。
在以下情况下考虑使用流式表:
- 查询是针对持续或以增量方式增长的数据源定义的。
- 应以增量方式计算查询结果。
- 管道需要高吞吐量和低延迟。
注意
流式处理表始终是针对流式处理源定义的。 你还可以将流式处理源与 APPLY CHANGES INTO
结合使用以应用 CDC 源中的更新。 请参阅 APPLY CHANGES API:使用增量实时表简化变更数据捕获。
从目标架构中排除表
如果必须计算不供外部使用的中间表,可以使用 TEMPORARY
关键字阻止将其发布到架构。 临时表仍然根据增量实时表语义存储和处理数据,但你不应在当前管道之外访问临时表。 临时表在创建它的管道的生存期内保持不变。 使用以下语法声明临时表:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
在单个管道中结合使用流式表和具体化视图
流式表继承了 Apache Spark 结构化流式处理的处理保证,并配置为处理来自仅追加数据源的查询,其中的新行始终插入到源表中,而不会经过修改。
注意
虽然默认情况下,流式表要求使用“仅追加”数据源,但当流式处理源是另一个需要更新或删除的流式表时,可以使用 skipChangeCommits 标志改写此行为。
常见的流式处理模式涉及到引入源数据以在管道中创建初始数据集。 这些初始数据集通常称为 bronze 表,通常用于执行简单的转换。
相比之下,管道中的最终表(通常称为 gold 表)通常需要复杂的聚合,或从作为 APPLY CHANGES INTO
操作目标的源中读取。 由于这些操作本质上是创建更新而不是追加,因此不支持将它们作为流式表的输入。 这些转换更适合具体化视图。
通过将流式表和具体化视图混合到单个管道中,可以简化管道并避免对原始数据进行代价高昂的重新引入或重新处理,还能充分利用 SQL 来计算经过有效编码和筛选的数据集上的复杂聚合。 以下示例演示了这种类型的混合处理:
注意
这些示例使用自动加载程序从云存储加载文件。 若要在启用了 Unity Catalog 的管道中使用自动加载程序加载文件,必须使用外部位置。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用。
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("LIVE.streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
详细了解如何使用自动加载程序从 Azure 存储增量地引入 JSON 文件。
流静态联接
在使用主要静态维度表对连续仅追加数据流进行非规范化时,流静态联接是一个不错的选择。
每次更新管道时,流中的新记录都会与静态表的最新快照联接在一起。 如果在处理流式表中的相应数据后在静态表中添加或更新记录,则除非执行完全刷新,否则不会重新计算最终的记录。
在配置为触发执行的管道中,静态表返回更新开始时的结果。 在配置为连续执行的管道中,每次表处理更新时,就会查询静态表的最新版本。
下面是流静态联接的示例:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
高效计算聚合
可以使用流式表来以增量方式计算简单的分布聚合(例如计数、最小值、最大值或总和)和代数聚合(例如平均值或标准偏差)。 Databricks 建议对组数量有限的查询进行增量聚合,例如,包含 GROUP BY country
子句的查询。 每次更新时仅读取新的输入数据。
若要详细了解如何编写执行增量聚合的 Delta Live Tables 查询,请参阅使用水印执行窗口聚合。
在增量实时表管道中使用 MLflow 模型
注意
若要在启用了 Unity Catalog 的管道中使用 MLflow 模型,必须将管道配置为使用 preview
通道。 若要使用 current
通道,必须将管道配置为发布到 Hive 元存储。
可以在增量实时表管道中使用 MLflow 训练的模型。 MLflow 模型在 Azure Databricks 中被视为转换,这意味着,它们将作用于 Spark 数据帧输入并将结果作为 Spark 数据帧返回。 由于增量实时表针对数据帧定义数据集,因此你只需编写几行代码即可将使用 MLflow 的 Apache Spark 工作负载转换为增量实时表。 有关 MLflow 的详细信息,请参阅使用 MLflow 进行 ML 生命周期管理。
如果你已有一个调用 MLflow 模型的 Python 笔记本,你可以使用 @dlt.table
修饰器并确保将函数定义为返回转换结果,使此代码适应增量实时表。 增量实时表默认不安装 MLflow,因此请确认已安装 MLFlow 库%pip install mlflow
,并在笔记本顶部导入并已导入mlflow
dlt
。 有关 Delta Live Tables 语法的简介,请参阅 使用 Python 开发管道代码。
若要在增量实时表中使用 MLflow 模型,请完成以下步骤:
- 获取 MLflow 模型的运行 ID 和模型名称。 运行 ID 和模型名称用于构造 MLflow 模型的 URI。
- 使用 URI 定义 Spark UDF,以加载 MLflow 模型。
- 在表定义中调用 UDF 以使用 MLflow 模型。
以下示例演示了此模式的基本语法:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
下面是一个完整的代码示例,它定义一个名为 loaded_model_udf
的 Spark UDF,用于加载一个基于贷款风险数据训练的 MLflow 模型。 用于预测的数据列作为参数传递给该 UDF。 表 loan_risk_predictions
计算 loan_risk_input_data
中每一行的预测。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
保留手动删除或更新
增量实时表允许手动删除或更新表中的记录,以及执行刷新操作以重新计算下游表。
默认情况下,增量实时表会在每次更新管道时根据输入数据重新计算表结果,因此必须确保不会从源数据中重新加载已删除的记录。 将 pipelines.reset.allowed
表属性设置为 false
可防止对表进行刷新,但不能防止对表进行增量写入,或防止新数据流入表中。
下图演示了使用两个流式表的示例:
raw_user_table
从源中引入原始用户数据。bmi_table
使用raw_user_table
中的重量和高度以增量方式计算 BMI 评分。
你想要手动删除或更新 raw_user_table
中的用户记录并重新计算 bmi_table
。
以下代码演示了将 pipelines.reset.allowed
表属性设置为 false
以禁用 raw_user_table
完全刷新,以便一直保留所需的更改,但在运行管道更新时重新计算下游表:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);