你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
在 Azure SQL Edge 中创建数据流作业
重要
Azure SQL Edge 将于 2025 年 9 月 30 日停用。 有关详细信息和迁移选项,请参阅停用通知。
注意
Azure SQL Edge 不再支持 ARM64 平台。
本文介绍了如何在 Azure SQL Edge 中创建 T-SQL 流式处理作业。 你将创建外部流输入和输出对象,然后在创建流式处理作业的过程中定义流式处理作业查询。
配置外部流输入和输出对象
T-SQL 流式处理使用 SQL Server 的外部数据源功能,来定义与流式处理作业的外部流输入和输出相关联的数据源。 使用以下 T-SQL 命令创建外部流输入或输出对象:
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
此外,如果将 Azure SQL Edge、SQL Server 或 Azure SQL 数据库用作输出流,则需要使用 CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL)。 此 T-SQL 命令定义用于访问数据库的凭据。
支持的输入和输出流数据源
Azure SQL Edge 目前仅支持以下数据源作为流输入和输出。
数据源类型 | 输入 | 输出 | 说明 |
---|---|---|---|
Azure IoT Edge 中心 | Y | Y | 将流式处理数据读/写到 Azure IoT Edge 中心的数据源。 有关详细信息,请参阅 IoT Edge 中心。 |
SQL 数据库 | N | Y | 将流式处理数据写入 SQL 数据库的数据源连接。 数据库可以是 Azure SQL Edge 中的本地数据库,也可以是 SQL Server 或 Azure SQL 数据库中的远程数据库。 |
Kafka | Y | N | 从 Kafka 主题读取流式处理数据的数据源。 |
示例:为 Azure IoT Edge 中心创建外部流输入/输出对象
以下示例为 Azure IoT Edge 中心创建外部流对象。 若要为 Azure IoT Edge 中心创建外部流输入/输出数据源,首先也需针对要读取或写入的数据的布局创建一个外部文件格式。
创建 JSON 类型的外部文件格式。
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
为 Azure IoT Edge 中心创建外部数据源。 以下 T-SQL 脚本创建与 IoT Edge 中心的数据源连接,该中心与 Azure SQL Edge 在同一 Docker 主机上运行。
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
为 Azure IoT Edge 中心创建外部流对象。 以下 T-SQL 脚本为 IoT Edge 中心创建流对象。 对于 IoT Edge 中心流对象,LOCATION 参数是要读取或写入的 IoT Edge 中心主题或通道的名称。
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
示例:创建 Azure SQL 数据库的外部流对象
下面的示例为 Azure SQL Edge 中的本地数据库创建了一个外部流对象。
在数据库上创建主密钥。 这是加密凭据密钥所必需的。
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
创建用于访问 SQL Server 源的、以数据库为作用域的凭据。 下面的示例为外部数据源创建一个凭据,其中 IDENTITY = username 且 SECRET = password。
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。 下面的示例:
- 创建名为 LocalSQLOutput 的外部数据源。
- 标识外部数据源 (
LOCATION = '<vendor>://<server>[:<port>]'
)。 在示例中,它指向 Azure SQL Edge 的本地实例。 - 使用先前创建的凭据。
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
创建外部流对象。 下面的示例创建一个指向 MySQLDatabase 数据库中的 dbo.TemperatureMeasurements 表的外部流对象。
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
示例:为 Kafka 创建外部流对象
下面的示例为 Azure SQL Edge 中的本地数据库创建了一个外部流对象。 此示例假设已将 kafka 服务器配置为使用匿名访问。
使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。 下面的示例:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
为 Kafka 输入创建外部文件格式。 以下示例创建了采用 GZipped 压缩的 JSON 文件格式。
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
创建外部流对象。 以下示例创建指向 Kafka 主题
TemperatureMeasurement
的外部流对象。CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
创建流式处理作业和流式处理查询
使用 sys.sp_create_streaming_job
系统存储过程来定义流式处理查询并创建流式处理作业。 sp_create_streaming_job
存储过程采用以下参数:
@job_name
:流式处理作业的名称。 流式处理作业名称在实例中是唯一的。@statement
:基于流分析查询语言的流式处理查询语句。
下面的示例创建一个简单的流式处理作业,其中包含一个流式处理查询。 此查询从 IoT Edge 中心读取输入,并写入到数据库中的 dbo.TemperatureMeasurements
。
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
以下示例创建一个更复杂的流式处理作业,其中包含多个不同查询。 这些查询中有一个查询使用内置的 AnomalyDetection_ChangePoint
函数来识别温度数据中的异常。
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
启动、停止、删除和监视流式处理作业
若要在 Azure SQL Edge 中启动流式处理作业,请运行 sys.sp_start_streaming_job
存储过程。 该存储过程需要使用要启动的流式处理作业的名称作为输入。
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
若要停止流式处理作业,请运行 sys.sp_stop_streaming_job
存储过程。 该存储过程需要使用要停止的流式处理作业的名称作为输入。
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
若要丢弃(或删除)流式处理作业,请运行 sys.sp_drop_streaming_job
存储过程。 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
若要获取流式处理作业的当前状态,请运行 sys.sp_get_streaming_job
存储过程。 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。 它输出流式处理作业的名称和当前状态。
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
流式处理作业可以处于以下任一状态:
状态 | 说明 |
---|---|
创建 | 流式处理作业已创建,但尚未启动。 |
正在启动 | 流式处理作业处于开始阶段。 |
空闲 | 流式处理作业正在运行,但没有要处理的输入。 |
Processing | 流式处理作业正在运行,且正在处理输入。 此状态指示流式处理作业的正常运行状态。 |
已降级 | 流式处理作业正在运行,但在处理输入期间出现一些非致命错误。 输入作业将继续运行,但将删除遇到错误的输入。 |
已停止 | 流式处理作业已停止。 |
Failed | 流式处理作业失败。 这通常表示在处理过程中出现灾难性错误。 |
注意
由于流式处理作业是异步执行的,因此作业可能会在运行时遇到错误。 若要排查流式处理作业失败问题,请使用 sys.sp_get_streaming_job
存储过程,或查看来自 Azure SQL Edge 容器的 Docker 日志,该容器可以提供流式处理作业中的错误详细信息。