Condividi tramite


CREATE EXTERNAL STREAM (Transact-SQL)

Importante

SQL Edge di Azure verrà ritirato il 30 settembre 2025. Per altre informazioni e per le opzioni di migrazione, vedere l'avviso di ritiro.

Nota

SQL Edge di Azure non supporta più la piattaforma ARM64.

L'oggetto EXTERNAL STREAM ha una duplice funzione: è sia un flusso di input che di output. Può essere usato come input per eseguire query sui dati di streaming da servizi di inserimento di eventi come Hub eventi di Azure, hub IoT di Azure o Hub Edge o Kafka oppure può essere usato come output per specificare dove e come archiviare i risultati di una query di streaming.

È anche possibile specificare e creare un EXTERNAL STREAM sia come output che come input per servizi come Hub eventi o archiviazione BLOB. Ciò semplifica gli scenari di concatenamento in cui una query di streaming rende persistenti i risultati nel flusso esterno come output e un'altra query di streaming legge dallo stesso flusso esterno come input.

SQL Edge di Azure attualmente supporta solo le origini dati seguenti come output e input di flusso.

Tipo di origine dati Input Output Descrizione
Hub di Azure IoT Edge S S Origine dati per leggere e scrivere dati in streaming su un hub di Azure IoT Edge. Per altre informazioni, vedere hub di IoT Edge.
Database SQL N Y Connessione all'origine dati per scrivere i dati di streaming nel database SQL. Il database può essere un database locale in SQL Edge di Azure o un database remoto in SQL Server o nel database SQL di Azure.
Kafka Y N Origine dati per la lettura dei dati in streaming da un argomento Kafka.

Sintassi

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

Argomenti

DATA_SOURCE

Per altre informazioni, vedere DATA_SOURCE.

FILE_FORMAT

Per altre informazioni, vedere FILE_FORMAT.

LOCATION

specifica il nome dei dati effettivi o della posizione nell'origine dati.

  • Per gli oggetti flusso hub di Edge o Kafka, il percorso specifica il nome dell'argomento hub di Edge o Kafka in cui leggere o scrivere.
  • Per gli oggetti di flusso SQL (SQL Server, database SQL di Azure o SQL Edge di Azure), il percorso specifica il nome della tabella. Se il flusso viene creato nello stesso database e nello stesso schema della tabella di destinazione, è sufficiente solo il nome tabella. In caso contrario, è necessario qualificare completamente il nome della tabella (<database_name>.<schema_name>.<table_name>).
  • Per la posizione dell'oggetto di flusso di Archiviazione BLOB di Azure fa riferimento al modello di percorso da usare all'interno del contenitore BLOB. Per altre informazioni, vedere Output di Analisi di flusso di Azure.

INPUT_OPTIONS

Specificare le opzioni come coppie chiave-valore per servizi come Kafka e hub IoT Edge che sono input per le query di streaming.

  • PARTIZIONI:

    Numero di partizioni definite per un argomento. Il numero massimo di partizioni che è possibile usare è limitato a 32 (si applica ai flussi di input Kafka).

    • CONSUMER_GROUP:

      gli hub eventi e gli hub IoT limitano il numero di lettori in un gruppo di consumer (a 5). Se si lascia vuoto questo campo, verrà usato il gruppo di consumer '$Default'.

      • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
    • TIME_POLICY:

      descrive se eliminare gli eventi o modificare l'ora dell'evento in presenza di eventi in ritardo o eventi non in ordine che superano il relativo valore di tolleranza.

      • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
    • LATE_EVENT_TOLERANCE:

      ritardo massimo accettabile. Il ritardo rappresenta la differenza tra il timestamp dell'evento e l'orologio di sistema.

      • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      gli eventi possono arrivare non in ordine dopo aver seguito il percorso dall'input alla query di streaming. Questi eventi possono essere accettati così come sono oppure è possibile scegliere di sospenderli per un periodo di tempo determinato per riordinarli.

      • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.

OUTPUT_OPTIONS

specificare le opzioni come coppie chiave-valore per i servizi supportati usati come output per le query di streaming

  • REJECT_POLICY: DROP | RETRY

    Specifica i criteri di gestione degli errori dei dati quando si verificano errori di conversione dei dati.

    • Si applica a tutti gli output supportati.
  • MINIMUM_ROWS:

    numero minimo di righe necessarie per ogni batch scritto in un output. Per Parquet, ogni batch crea un nuovo file.

    • Si applica a tutti gli output supportati.
  • MAXIMUM_TIME:

    Tempo massimo di attesa in minuti per batch. Dopo questo periodo di tempo, il batch verrà scritto sull'output anche se il requisito minimo di righe non viene soddisfatto.

    • Si applica a tutti gli output supportati.
  • PARTITION_KEY_COLUMN:

    colonna usata per la chiave di partizione.

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
  • PROPERTY_COLUMNS:

    Elenco delimitato da virgole dei nomi delle colonne di output che sono associate ai messaggi come proprietà personalizzate, se specificato.

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
  • SYSTEM_PROPERTY_COLUMNS:

    raccolta in formato JSON di coppie nome-valore dei nomi delle proprietà di sistema e delle colonne di output da popolare nei messaggi del bus di servizio. Ad esempio: { "MessageId": "column1", "PartitionKey": "column2" }.

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
  • PARTITION_KEY:

    Nome della colonna di output contenente la chiave di partizione. La chiave di partizione è un identificatore univoco per la partizione in una determinata tabella che costituisce la prima parte della chiave primaria di un'entità. È un valore stringa che può avere una dimensione massima di 1 kB.

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
  • ROW_KEY:

    Nome della colonna di output contenente la chiave di riga. La chiave di riga è un identificatore univoco per un’entità all'interno di una determinata partizione. Costituisce la seconda parte della chiave primaria di un'entità. La chiave di riga è un valore stringa le cui dimensioni massime sono di 1 KB.

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
  • BATCH_SIZE:

    rappresenta il numero di transazioni per l'archiviazione tabelle in cui il massimo può raggiungere 100 record. Per Funzioni di Azure rappresenta le dimensioni del batch in byte inviate alla funzione per ogni chiamata. Il valore predefinito è 256 kB.

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.
  • MAXIMUM_BATCH_COUNT:

    numero massimo di eventi inviati alla funzione per ogni chiamata. Il valore predefinito è 100. Per il database SQL, rappresenta il numero massimo di record inviati con ogni transazione di inserimento bulk. Il valore predefinito è 10.000.

    • Si applica a tutti gli output basati su SQL
  • STAGING_AREA: oggetto EXTERNAL DATA SOURCE nell'archivio BLOB

    Area di gestione temporanea per l'inserimento di dati ad alta velocità effettiva in Azure Synapse Analytics

    • Riservato all'uso futuro. Non si applica a SQL Edge di Azure.

Per altre informazioni sulle opzioni di input e output supportate corrispondenti al tipo di origine dati, vedere rispettivamente Panoramica di Analisi di flusso di Azure - Panoramica dell'input e Analisi di flusso di Azure - Panoramica degli output.

Esempi

Esempio A: EdgeHub

Tipo: input o output.

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Esempio B: Database SQL di Azure, SQL Edge di Azure, SQL Server

Tipo: Output

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Esempio C: Kafka

Tipo: Input

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );