Vytvoření úlohy streamování dat v Azure SQL Edge
Důležité
Azure SQL Edge bude vyřazeno 30. září 2025. Další informace a možnosti migrace najdete v oznámení o vyřazení.
Poznámka:
Azure SQL Edge už nepodporuje platformu ARM64.
Tento článek vysvětluje, jak vytvořit úlohu streamování T-SQL v Azure SQL Edge. Vytvoříte externí vstupní a výstupní objekty datového proudu a pak definujete dotaz úlohy streamování jako součást vytváření úlohy streamování.
Konfigurace vstupních a výstupních objektů externího datového proudu
Streamování T-SQL používá funkci externího zdroje dat SQL Serveru k definování zdrojů dat přidružených ke vstupům externího datového proudu a výstupům úlohy streamování. K vytvoření externího vstupního nebo výstupního objektu datového proudu použijte následující příkazy T-SQL:
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
Pokud se navíc Azure SQL Edge, SQL Server nebo Azure SQL Database používají jako výstupní datový proud, potřebujete PŘIHLAŠOVACÍ ÚDAJE CREATE DATABASE SCOPED (Transact-SQL). Tento příkaz T-SQL definuje přihlašovací údaje pro přístup k databázi.
Podporované zdroje dat vstupního a výstupního streamu
Azure SQL Edge v současné době podporuje jako vstupy a výstupy datových proudů pouze následující zdroje dat.
Typ zdroje dat | Vstup | Výstup | Popis |
---|---|---|---|
Centrum Azure IoT Edge | Y | Y | Zdroj dat pro čtení a zápis streamovaných dat do centra Azure IoT Edge. Další informace najdete v tématu IoT Edge Hub. |
SQL Database | N | Y | Připojení ke zdroji dat pro zápis streamovaných dat do služby SQL Database Databáze může být místní databáze v Azure SQL Edge nebo vzdálená databáze v SQL Serveru nebo Azure SQL Database. |
Kafka | Y | N | Zdroj dat ke čtení streamovaných dat z tématu Kafka |
Příklad: Vytvoření externího vstupního/výstupního objektu streamu pro centrum Azure IoT Edge
Následující příklad vytvoří objekt externího streamu pro centrum Azure IoT Edge. Pokud chcete vytvořit externí zdroj vstupních a výstupních dat datového proudu pro centrum Azure IoT Edge, musíte nejprve vytvořit formát externího souboru pro rozložení dat, která se čtou nebo zapisují.
Vytvořte formát externího souboru typu JSON.
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
Vytvořte externí zdroj dat pro centrum Azure IoT Edge. Následující skript T-SQL vytvoří připojení zdroje dat k centru IoT Edge, které běží na stejném hostiteli Dockeru jako Azure SQL Edge.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
Vytvořte objekt externího streamu pro centrum Azure IoT Edge. Následující skript T-SQL vytvoří objekt streamu pro centrum IoT Edge. V případě objektu streamu centra IoT Edge je parametr LOCATION názvem tématu centra IoT Edge nebo kanálu, do kterého se čte nebo zapisuje.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
Příklad: Vytvoření externího streamového objektu do azure SQL Database
Následující příklad vytvoří objekt externího datového proudu pro místní databázi v Azure SQL Edge.
Vytvořte v databázi hlavní klíč. To se vyžaduje k šifrování tajného klíče přihlašovacích údajů.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
Vytvořte přihlašovací údaje s oborem databáze pro přístup ke zdroji SQL Serveru. Následující příklad vytvoří přihlašovací údaje pro externí zdroj dat s IDENTITOU = uživatelské jméno a SECRET = heslo.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
Vytvořte externí zdroj dat pomocí příkazu CREATE EXTERNAL DATA SOURCE. Následující příklad:
- Vytvoří externí zdroj dat s názvem LocalSQLOutput.
- Identifikuje externí zdroj dat (
LOCATION = '<vendor>://<server>[:<port>]'
). V tomto příkladu odkazuje na místní instanci Azure SQL Edge. - Používá přihlašovací údaje vytvořené dříve.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
Vytvořte objekt externího datového proudu. Následující příklad vytvoří objekt externího datového proudu odkazující na tabulku dbo. TemperatureMeasurements v databázi MySQLDatabase.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Příklad: Vytvoření objektu externího streamu pro Kafka
Následující příklad vytvoří objekt externího datového proudu pro místní databázi v Azure SQL Edge. V tomto příkladu se předpokládá, že je server Kafka nakonfigurovaný pro anonymní přístup.
Vytvořte externí zdroj dat pomocí příkazu CREATE EXTERNAL DATA SOURCE. Následující příklad:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
Vytvořte pro vstup Kafka formát externího souboru. Následující příklad vytvořil formát souboru JSON s GZipped Compression.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
Vytvořte objekt externího datového proudu. Následující příklad vytvoří objekt externího datového proudu odkazující na téma
TemperatureMeasurement
Kafka .CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
Vytvoření úlohy streamování a dotazů streamování
sys.sp_create_streaming_job
Pomocí systémové uložené procedury definujte dotazy streamování a vytvořte úlohu streamování. Uložená procedura sp_create_streaming_job
přebírá následující parametry:
@job_name
: Název úlohy streamování. Názvy úloh streamování jsou v celé instanci jedinečné.@statement
: Stream Analytics Query Language – příkazy dotazů založené na streamovacím jazyce
Následující příklad vytvoří jednoduchou úlohu streamování s jedním dotazem streamování. Tento dotaz načte vstupy z centra IoT Edge a zapisuje do dbo.TemperatureMeasurements
databáze.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
Následující příklad vytvoří složitější úlohu streamování s několika různými dotazy. Mezi tyto dotazy patří jedna, která používá integrovanou AnomalyDetection_ChangePoint
funkci k identifikaci anomálií v datech o teplotě.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
Spuštění, zastavení, vyřazení a monitorování úloh streamování
Pokud chcete spustit úlohu streamování v Azure SQL Edge, spusťte uloženou proceduru sys.sp_start_streaming_job
. Uložená procedura vyžaduje, aby se jako vstup spustil název úlohy streamování.
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
Pokud chcete zastavit úlohu streamování, spusťte uloženou proceduru sys.sp_stop_streaming_job
. Uložená procedura vyžaduje, aby se jako vstup zastavil název úlohy streamování.
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
Pokud chcete odstranit (nebo odstranit) úlohu streamování, spusťte uloženou proceduru sys.sp_drop_streaming_job
. Uložená procedura vyžaduje, aby se jako vstup zobrazoval název úlohy streamování.
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
Pokud chcete získat aktuální stav úlohy streamování, spusťte uloženou proceduru sys.sp_get_streaming_job
. Uložená procedura vyžaduje, aby se jako vstup zobrazoval název úlohy streamování. Vypíše název a aktuální stav úlohy streamování.
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
Úloha streamování může mít některý z následujících stavů:
Status | Popis |
---|---|
Vytvořeno | Úloha streamování byla vytvořena, ale ještě nebyla spuštěna. |
Spouštění | Úloha streamování je ve počáteční fázi. |
Období | Úloha streamování je spuštěná, ale neexistuje žádný vstup ke zpracování. |
zpracovává se | Úloha streamování je spuštěná a zpracovává vstupy. Tento stav označuje stav, který je v pořádku pro úlohu streamování. |
Snížený výkon | Úloha streamování je spuštěná, ale během zpracování vstupu došlo k některým nefaktálním chybám. Vstupní úloha se bude dál spouštět, ale zahodí vstupy, u kterých dochází k chybám. |
Zastaveno | Úloha streamování byla zastavena. |
Neúspěšný | Úloha streamování selhala. Obvykle to značí závažnou chybu během zpracování. |
Poznámka:
Vzhledem k tomu, že úloha streamování je spuštěna asynchronně, může úloha narazit na chyby za běhu. K řešení potíží se selháním úlohy streamování použijte sys.sp_get_streaming_job
uloženou proceduru nebo zkontrolujte protokol Dockeru z kontejneru Azure SQL Edge, který může poskytnout podrobnosti o chybě z úlohy streamování.