Creare un processo di streaming di dati in SQL Edge di Azure
Importante
SQL Edge di Azure verrà ritirato il 30 settembre 2025. Per altre informazioni e per le opzioni di migrazione, vedere l'avviso di ritiro.
Nota
SQL Edge di Azure non supporta più la piattaforma ARM64.
Questo articolo illustra come creare un processo di Streaming T-SQL in SQL Edge di Azure. Si creano gli oggetti di input e output del flusso esterno e quindi si definisce la query del processo di streaming come parte della creazione del processo di streaming.
Creare gli oggetti di input e output del flusso esterno
Streaming T-SQL usa la funzionalità dell'origine dati esterna di SQL Server per definire le origini dati associate agli input e agli output del flusso esterno del processo di streaming. Per creare un oggetto di input o di output del flusso esterno, usare i seguenti comandi T-SQL.
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
Inoltre, se SQL Edge di Azure, SQL Server o il database SQL di Azure viene usato come flusso di output, è necessario CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). Questo comando T-SQL definisce le credenziali per accedere al database.
Origini dati dei flussi di input e di output supportati
SQL Edge di Azure attualmente supporta solo le origini dati seguenti come output e input di flusso.
Tipo di origine dati | Input | Output | Descrizione |
---|---|---|---|
Hub Azure IoT Edge | S | S | Origine dati per la lettura e scrittura dei dati di streaming in un hub Azure IoT Edge. Per altre informazioni, vedere Azure IoT Edge. |
Database SQL | N | Y | Connessione all'origine dati per scrivere i dati di streaming nel database SQL. Il database può essere un database locale in SQL Edge di Azure o un database remoto in SQL Server o nel database SQL di Azure. |
Kafka | Y | N | Origine dati per la lettura dei dati in streaming da un argomento Kafka. |
Esempio: Creare un oggetto di input/output del flusso esterno per l'hub di Azure IoT Edge
Il seguente esempio crea un oggetto di flusso esterno per l'hub IoT Edge di Azure. Per creare un'origine dati di input/output del flusso esterno per l'hub di Azure IoT Edge, è prima di tutto necessario creare un formato di file esterno per SQL per comprendere anche il layout dei dati letti o scritti.
Creare un formato di file esterno del tipo JSON.
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
Creare un'origine dati esterna per l'hub IoT Edge di Azure. Lo script T-SQL seguente crea una connessione all'origine dati a un hub IoT Edge eseguito nello stesso host Docker di SQL Edge di Azure.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
Creare l'oggetto flusso esterno per l'hub di IoT Edge. Il seguente script T-SQL crea un oggetto flusso per l'hub IoT Edge. Nel caso di un oggetto flusso dell'hub di Edge, il parametro LOCATION è il nome dell'argomento o canale dell'hub di Edge in lettura o scrittura.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
Esempio: Creare un oggetto flusso esterno nel database SQL di Azure
Il seguente esempio crea un oggetto flusso esterno al database locale in SQL Edge di Azure.
Creare una chiave master nel database. Questo passaggio è necessario per crittografare il segreto delle credenziali.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
Creare le credenziali con ambito database per l'accesso all'origine SQL Server. Nell'esempio seguente viene creata una credenziale per l'origine dati esterna con IDENTITY = 'nomeutente' e SECRET = 'password'.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
Creare un'origine dati esterna con CREATE EXTERNAL DATA SOURCE. L'esempio seguente:
- Crea un'origine dati esterna denominata LocalSQLOutput.
- Identifica l'origine dati esterna (
LOCATION = '<vendor>://<server>[:<port>]'
). Nell'esempio, punta a un'istanza locale di SQL Edge di Azure. - Usa le credenziali create in precedenza.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
Creare l'oggetto flusso esterno. Nell'esempio seguente viene creato un oggetto flusso esterno che punta a una tabella dbo.TemperatureMeasurements nel database MySQLDatabase.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Esempio: Creare un oggetto flusso esterno per Kafka
Il seguente esempio crea un oggetto flusso esterno al database locale in SQL Edge di Azure. In questo esempio si presuppone che il server kafka sia configurato per l'accesso anonimo.
Creare un'origine dati esterna con CREATE EXTERNAL DATA SOURCE. L'esempio seguente:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
Creare un formato di file esterno per l'input Kafka. L'esempio seguente ha creato un formato di file JSON con compressione GZipped.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
Creare l'oggetto flusso esterno. Nell'esempio seguente viene creato un oggetto flusso esterno che punta all'argomento Kafka
TemperatureMeasurement
.CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
Creare il processo di streaming e le query di streaming
Usare la sys.sp_create_streaming_job
stored procedure di sistema per definire le query di streaming e creare il processo di streaming. La sp_create_streaming_job
stored procedure accetta i parametri seguenti:
@job_name
: Nome del processo di streaming. I nomi dei processi di streaming sono univoci nell'istanza.@statement
: Istruzioni di query di streaming basate sul Linguaggio di query di Analisi di flusso.
L'esempio seguente crea un processo di streaming semplice con una query di streaming. Questa query legge gli input dall'hub IoT Edge e scrive a dbo.TemperatureMeasurements
nel database.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
Il seguente esempio crea un processo di streaming più complesso con più query diverse. Queste query includono una che usa la funzione predefinita AnomalyDetection_ChangePoint
per identificare le anomalie nei dati relativi alla temperatura.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
Avviare, arrestare, eliminare e monitorare i processi di streaming
Per avviare un processo di streaming in SQL Edge di Azure, eseguire la sys.sp_start_streaming_job
stored procedure. La stored procedure richiede il nome del processo di streaming da avviare, come input.
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
Per arrestare un processo di streaming, eseguire la sys.sp_stop_streaming_job
stored procedure. La stored procedure richiede il nome del processo di streaming da arrestare, come input.
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
Per eliminare o cancellare un processo di streaming, eseguire la sys.sp_drop_streaming_job
stored procedure. La stored procedure richiede il nome del processo di streaming da eliminare, come input.
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
Per ottenere lo stato corrente di un processo di streaming, eseguire la sys.sp_get_streaming_job
stored procedure. La stored procedure richiede il nome del processo di streaming da eliminare, come input. Restituisce il nome e lo stato corrente del processo di streaming.
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
Il processo di streaming può trovarsi in uno degli stati seguenti:
Stato | Descrizione |
---|---|
Data di creazione | Il processo di streaming è stato creato, ma non è ancora stato avviato. |
Avvio | Il processo di streaming si trova nella fase di avvio. |
Idle | Il processo di streaming è in esecuzione, ma non è disponibile alcun input da elaborare. |
in lavorazione | Il processo di streaming è in esecuzione ed è in corso l'elaborazione degli input. Questo stato indica uno stato integro per il processo di streaming. |
Degraded | Il processo di streaming è in esecuzione, ma si sono verificati alcuni errori non irreversibili durante l'elaborazione dell'input. L'esecuzione del processo di input continuerà, ma verranno eliminati gli input per cui si verificano errori. |
Arrestato | Il processo di streaming è stato arrestato. |
Non riuscito | Il processo di streaming non è riuscito. Si tratta in genere di un'indicazione di un errore irreversibile durante l'elaborazione. |
Nota
Poiché il processo di streaming viene eseguito in modo asincrono, il processo potrebbe riscontrare errori in fase di esecuzione. Per risolvere un errore del processo di streaming, usare la sys.sp_get_streaming_job
stored procedure o esaminare il log Docker dal contenitore SQL Edge di Azure, che può fornire i dettagli dell'errore del processo di streaming.