CREATE EXTERNAL STREAM (Transact-SQL)
Important
Azure SQL Edge sera mis hors service le 30 septembre 2025. Pour plus d’informations et pour connaître les options de migration, consultez l’Avis de mise hors service.
Remarque
Azure SQL Edge ne prend plus en charge la plateforme ARM64.
L’objet EXTERNAL STREAM (flux externe) fait office à la fois de flux d’entrée et de flux de sortie. Il peut être utilisé comme entrée pour interroger les données en diffusion continue auprès de services d’ingestion d’événements comme Azure Event Hubs, Azure IoT Hub (ou Edge Hub) ou Kafka. Il peut aussi être utilisé comme sortie pour spécifier où et comment stocker les résultats d’une requête de diffusion en continu.
Un FLUX EXTERNE peut également être spécifié et créé en tant que sortie et entrée pour des services comme Event Hubs ou le Stockage Blob. Cela facilite les scénarios de chaînage où une requête de streaming conserve les résultats dans le flux externe en tant que sortie et une autre requête de streaming lit le même flux externe en tant qu'entrée.
Actuellement, Azure SQL Edge prend uniquement en charge les sources de données suivantes en tant qu’entrées et sorties de flux.
Type de source de données | Entrée | Sortie | Description |
---|---|---|---|
Hub Azure IoT Edge | A | A | Source de données pour lire et écrire des données de streaming dans un hub Azure IoT Edge. Pour plus d’informations, consultez l’article IoT Edge Hub. |
SQL Database | N | O | Connexion à la source de données pour écrire les données de streaming dans SQL Database. La base de données peut être une base de données locale dans Azure SQL Edge, ou une base de données distante dans SQL Server ou Azure SQL Database. |
Kafka | O | N | Source de données pour lire les données de streaming à partir d’une rubrique Kafka. |
Syntaxe
CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH ( <with_options> )
<column_definition> ::=
column_name <column_data_type>
<data_type> ::=
[ type_schema_name . ] type_name
[ ( precision [ , scale ] | max ) ]
<with_options> ::=
DATA_SOURCE = data_source_name ,
LOCATION = location_name ,
[ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
[ <optional_input_options> ] ,
[ <optional_output_options> ] ,
TAGS = <tag_column_value>
<optional_input_options> ::=
INPUT_OPTIONS = ' [ <input_options_data> ] '
<Input_option_data> ::=
<input_option_values> [ , <input_option_values> ]
<input_option_values> ::=
PARTITIONS: [ number_of_partitions ]
| CONSUMER_GROUP: [ consumer_group_name ]
| TIME_POLICY: [ time_policy ]
| LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
| OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]
<optional_output_options> ::=
OUTPUT_OPTIONS = ' [ <output_option_data> ] '
<output_option_data> ::=
<output_option_values> [ , <output_option_values> ]
<output_option_values> ::=
REJECT_POLICY: [ reject_policy ]
| MINIMUM_ROWS: [ row_value ]
| MAXIMUM_TIME: [ time_value_minutes ]
| PARTITION_KEY_COLUMN: [ partition_key_column_name ]
| PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| PARTITION_KEY: [ partition_key_name ]
| ROW_KEY: [ row_key_name ]
| BATCH_SIZE: [ batch_size_value ]
| MAXIMUM_BATCH_COUNT: [ batch_value ]
| STAGING_AREA: [ blob_data_source ]
<tag_column_value> ::= -- Reserved for Future Usage
);
Arguments
DATA_SOURCE
Pour plus d’informations, consultez DATA_SOURCE.
FILE_FORMAT
Pour plus d’informations, consultez FILE_FORMAT.
LOCATION
Spécifie le nom des données ou de l’emplacement réels dans la source de données.
- Pour les objets de flux Edge Hub ou Kafka, l’emplacement indique le nom de la rubrique Edge Hub ou Kafka où lire et écrire.
- Pour les objets de flux SQL (SQL Server, Azure SQL Database ou Azure SQL Edge), l’emplacement indique le nom de la table. Si le flux est créé dans la même base de données et le même schéma que la table de destination, seul le nom de la table suffit. Sinon, vous devez qualifier entièrement le nom de la table (
<database_name>.<schema_name>.<table_name>
). - Pour un objet de flux Stockage Blob Azure, l’emplacement fait référence au modèle de chemin à utiliser dans le conteneur d’objets blob. Pour plus d’informations, consultez Sorties d’Azure Stream Analytics.
INPUT_OPTIONS
Spécifie les options en tant que paires clé-valeur pour des services comme Kafka et IoT Edge Hubs qui font office d’entrées pour les requêtes de diffusion en continu.
PARTITIONS :
Nombre de partitions définies pour une rubrique. Le nombre maximal de partitions qui peuvent être utilisées est limité à 32 (s’applique aux flux d’entrée Kafka).
CONSUMER_GROUP :
Event Hub et IoT Hub limitent le nombre de lecteurs au sein d'un groupe de consommateurs (à 5). Laissez ce champ vide pour utiliser le groupe de consommateurs « $Default ».
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
TIME_POLICY :
Indique s’il convient de supprimer des événements ou d'ajuster l’heure des événements lorsque des événements tardifs ou en désordre dépassent leur valeur de tolérance.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
LATE_EVENT_TOLERANCE :
Délai maximal acceptable. Ce délai correspond à la différence entre le timestamp de l’événement et l’horloge système.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
OUT_OF_ORDER_EVENT_TOLERANCE :
Des événements peuvent arriver en désordre après avoir transité de l'entrée vers la requête de streaming. Vous pouvez accepter ces événements tels quels ou choisir de les suspendre pendant une période définie afin de les mettre dans l'ordre souhaité.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
OUTPUT_OPTIONS
Spécifie les options en tant que paires clé-valeur pour les services pris en charge correspondant à des entrées de requêtes de streaming
REJECT_POLICY : DROP | RETRY
Spécifie les stratégies en matière de gestion des erreurs de données lorsque des erreurs de conversion de données se produisent.
- S’applique à toutes les sorties prises en charge.
MINIMUM_ROWS :
Nombre minimal de lignes requises par lot écrit dans une sortie. Pour Parquet, chaque lot crée un fichier.
- S’applique à toutes les sorties prises en charge.
MAXIMUM_TIME :
Délai d’attente maximal par lot en minutes. À l’issue de cette période, le lot est écrit dans la sortie même si l’exigence de lignes minimum n’est pas remplie.
- S’applique à toutes les sorties prises en charge.
PARTITION_KEY_COLUMN :
Colonne utilisée pour la clé de partition.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
PROPERTY_COLUMNS :
Une liste des noms séparés par des virgules dénotant des colonnes de sortie attachées aux messages en tant que propriétés personnalisées, le cas échéant.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
SYSTEM_PROPERTY_COLUMNS :
Collection JSON de paires nom/valeur de noms de propriétés système et de colonnes de sortie à remplir sur les messages Service Bus. Par exemple :
{ "MessageId": "column1", "PartitionKey": "column2" }
.- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
PARTITION_KEY :
Nom de la colonne de sortie contenant la clé de partition. La clé de partition est un identificateur unique pour la partition dans une table donnée qui constitue la première partie de la clé primaire d’une entité. Il s’agit d’une valeur de chaîne pouvant atteindre 1 Ko.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
ROW_KEY :
Nom de la colonne de sortie contenant la clé de ligne. La clé de ligne est un identificateur unique pour une entité dans une partition donnée. Elle constitue la deuxième partie de la clé primaire d'une entité. La clé de ligne est une valeur de chaîne qui peut atteindre 1 Ko.
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
BATCH_SIZE :
Correspond au nombre de transactions pour le stockage de table avec un maximum de 100 enregistrements. Pour Azure Functions, il s’agit de la taille du lot en octets envoyé à la fonction par appel, la valeur (256 ko par défaut).
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
MAXIMUM_BATCH_COUNT :
Nombre maximal d'événements envoyés à la fonction par appel pour la fonction Azure (100 par défaut). Pour SQL Database, cela représente le nombre maximal d'enregistrements envoyés avec chaque transaction d'insertion en bloc (10 000 par défaut).
- S’applique à toutes les sorties basées sur SQL
STAGING_AREA : objet EXTERNAL DATA SOURCE vers le Stockage Blob
La zone de transit pour l’ingestion de données à haut débit dans Azure Synapse Analytics
- Réservé pour un usage ultérieur. Ne s’applique pas à Azure SQL Edge.
Pour plus d’informations sur les options d’entrée et de sortie prises en charge en fonction du type de source de données, consultez respectivement Azure Stream Analytics - Vue d’ensemble des entrées et Azure Stream Analytics - Vue d’ensemble des sorties.
Exemples
Exemple A : EdgeHub
Type : entrée ou sortie.
CREATE EXTERNAL DATA SOURCE MyEdgeHub
WITH (LOCATION = 'edgehub://');
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (FORMAT_TYPE = JSON);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyEdgeHub,
FILE_FORMAT = myFileFormat,
LOCATION = '<mytopicname>',
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Exemple B : Azure SQL Database, Azure SQL Edge, SQL Server
Type : sortie
CREATE DATABASE SCOPED CREDENTIAL SQLCredName
WITH IDENTITY = '<user>',
SECRET = '<password>';
-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = '<my_server_name>.database.windows.net',
CREDENTIAL = SQLCredName
);
--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = ' <sqlserver://<ipaddress>,<port>',
CREDENTIAL = SQLCredName
);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyTargetSQLTable,
LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
--Note: If table is contained in the database, <TableName> should be sufficient
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Exemple C : Kafka
Type : entrée
CREATE EXTERNAL DATA SOURCE MyKafka_tweets
WITH (
--The location maps to KafkaBootstrapServer
LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
CREDENTIAL = kafkaCredName
);
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (
FORMAT_TYPE = JSON,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
);
CREATE EXTERNAL STREAM Stream_A (
user_id VARCHAR,
tweet VARCHAR
)
WITH (
DATA_SOURCE = MyKafka_tweets,
LOCATION = '<KafkaTopicName>',
FILE_FORMAT = myFileFormat,
INPUT_OPTIONS = 'PARTITIONS: 5'
);