CREATE EXTERNAL STREAM (Transact-SQL)
Belangrijk
Azure SQL Edge wordt op 30 september 2025 buiten gebruik gesteld. Zie de kennisgeving buitengebruikstelling voor meer informatie en migratieopties.
Notitie
Azure SQL Edge biedt geen ondersteuning meer voor het ARM64-platform.
Het OBJECT EXTERNAL STREAM heeft een dubbel doel van zowel een invoer- als uitvoerstroom. Het kan worden gebruikt als invoer voor het opvragen van streaminggegevens van services voor gebeurtenisopname, zoals Azure Event Hubs, Azure IoT Hub (of Edge Hub) of Kafka, of kan worden gebruikt als uitvoer om op te geven waar en hoe resultaten van een streamingquery moeten worden opgeslagen.
Een EXTERNAL STREAM kan ook worden opgegeven en gemaakt als uitvoer en invoer voor services zoals Event Hubs of Blob Storage. Dit vereenvoudigt ketenscenario's waarbij een streamingquery resultaten naar de externe stream bewaart als uitvoer en een andere streamingquery die uit dezelfde externe stroom leest als invoer.
Azure SQL Edge ondersteunt momenteel alleen de volgende gegevensbronnen als stroominvoer en -uitvoer.
Gegevensbrontype | Invoer | Uitvoer | Beschrijving |
---|---|---|---|
Azure IoT Edge-hub | J | J | Gegevensbron voor het lezen en schrijven van streaminggegevens naar een Azure IoT Edge-hub. Zie IoT Edge Hub voor meer informatie. |
SQL Database | N | J | Gegevensbronverbinding voor het schrijven van streaminggegevens naar SQL Database. De database kan een lokale database zijn in Azure SQL Edge of een externe database in SQL Server of Azure SQL Database. |
Kafka | J | N | Gegevensbron voor het lezen van streaminggegevens uit een Kafka-onderwerp. |
Syntaxis
CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH ( <with_options> )
<column_definition> ::=
column_name <column_data_type>
<data_type> ::=
[ type_schema_name . ] type_name
[ ( precision [ , scale ] | max ) ]
<with_options> ::=
DATA_SOURCE = data_source_name ,
LOCATION = location_name ,
[ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
[ <optional_input_options> ] ,
[ <optional_output_options> ] ,
TAGS = <tag_column_value>
<optional_input_options> ::=
INPUT_OPTIONS = ' [ <input_options_data> ] '
<Input_option_data> ::=
<input_option_values> [ , <input_option_values> ]
<input_option_values> ::=
PARTITIONS: [ number_of_partitions ]
| CONSUMER_GROUP: [ consumer_group_name ]
| TIME_POLICY: [ time_policy ]
| LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
| OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]
<optional_output_options> ::=
OUTPUT_OPTIONS = ' [ <output_option_data> ] '
<output_option_data> ::=
<output_option_values> [ , <output_option_values> ]
<output_option_values> ::=
REJECT_POLICY: [ reject_policy ]
| MINIMUM_ROWS: [ row_value ]
| MAXIMUM_TIME: [ time_value_minutes ]
| PARTITION_KEY_COLUMN: [ partition_key_column_name ]
| PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| PARTITION_KEY: [ partition_key_name ]
| ROW_KEY: [ row_key_name ]
| BATCH_SIZE: [ batch_size_value ]
| MAXIMUM_BATCH_COUNT: [ batch_value ]
| STAGING_AREA: [ blob_data_source ]
<tag_column_value> ::= -- Reserved for Future Usage
);
Argumenten
DATA_SOURCE
Zie DATA_SOURCE voor meer informatie.
FILE_FORMAT
Zie FILE_FORMAT voor meer informatie.
LOCATIE
Hiermee geeft u de naam voor de werkelijke gegevens of locatie in de gegevensbron.
- Voor Edge Hub- of Kafka-streamobjecten geeft de locatie de naam op van het Edge Hub- of Kafka-onderwerp waaruit u wilt lezen of schrijven naar.
- Voor SQL-streamobjecten (SQL Server, Azure SQL Database of Azure SQL Edge) geeft de locatie de naam van de tabel op. Als de stream wordt gemaakt in dezelfde database en hetzelfde schema als de doeltabel, volstaat alleen de tabelnaam. Anders moet u de tabelnaam (
<database_name>.<schema_name>.<table_name>
) volledig kwalificeren. - Voor azure Blob Storage-streamobjectlocatie verwijst naar het padpatroon dat moet worden gebruikt in de blobcontainer. Zie Uitvoer van Azure Stream Analytics voor meer informatie.
INPUT_OPTIONS
Geef opties op als sleutel-waardeparen voor services zoals Kafka en IoT Edge Hubs, die invoer zijn voor streamingquery's.
PARTITIES:
Het aantal partities dat is gedefinieerd voor een onderwerp. Het maximum aantal partities dat kan worden gebruikt, is beperkt tot 32 (van toepassing op Kafka-invoerstromen).
CONSUMER_GROUP:
Gebeurtenis- en IoT Hubs beperken het aantal lezers binnen één consumentengroep (tot 5). Als u dit veld leeg laat, wordt de consumentengroep '$Default' gebruikt.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
TIME_POLICY:
Hierin wordt beschreven of gebeurtenissen moeten worden verwijderd of de tijd van de gebeurtenis moet worden aangepast wanneer late gebeurtenissen of niet-order-gebeurtenissen hun tolerantiewaarde doorgeven.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
LATE_EVENT_TOLERANCE:
De maximaal acceptabele tijdsvertraging. De vertraging vertegenwoordigt het verschil tussen de tijdstempel van de gebeurtenis en de systeemklok.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
OUT_OF_ORDER_EVENT_TOLERANCE:
Gebeurtenissen kunnen niet op volgorde aankomen nadat ze de reis hebben gemaakt van de invoer naar de streamingquery. Deze gebeurtenissen kunnen als zodanig worden geaccepteerd of u kunt ervoor kiezen om een bepaalde periode te onderbreken om ze opnieuw te ordenen.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
OUTPUT_OPTIONS
Opties opgeven als sleutel-waardeparen voor ondersteunde services die worden uitgevoerd voor streamingquery's
REJECT_POLICY: DROP | OPNIEUW
Soort de beleidsregels voor het verwerken van gegevensfouten wanneer er fouten optreden bij het converteren van gegevens.
- Is van toepassing op alle ondersteunde uitvoer.
MINIMUM_ROWS:
Minimale rijen die per batch zijn geschreven naar een uitvoer. Voor Parquet maakt elke batch een nieuw bestand.
- Is van toepassing op alle ondersteunde uitvoer.
MAXIMUM_TIME:
Maximale wachttijd in minuten per batch. Na deze tijd wordt de batch naar de uitvoer geschreven, zelfs als niet aan de minimumvereisten voor rijen wordt voldaan.
- Is van toepassing op alle ondersteunde uitvoer.
PARTITION_KEY_COLUMN:
De kolom die wordt gebruikt voor de partitiesleutel.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
PROPERTY_COLUMNS:
Een door komma's gescheiden lijst met de namen van uitvoerkolommen die zijn gekoppeld aan berichten als aangepaste eigenschappen, indien opgegeven.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
SYSTEM_PROPERTY_COLUMNS:
Een met JSON opgemaakte verzameling naam-/waardeparen van systeemeigenschapsnamen en uitvoerkolommen die moeten worden ingevuld in Service Bus-berichten. Bijvoorbeeld:
{ "MessageId": "column1", "PartitionKey": "column2" }
.- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
PARTITION_KEY:
De naam van de uitvoerkolom met de partitiesleutel. De partitiesleutel is een unieke id voor de partitie in een bepaalde tabel die het eerste deel van de primaire sleutel van een entiteit vormt. Het is een tekenreekswaarde die maximaal 1 kB groot kan zijn.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
ROW_KEY:
De naam van de uitvoerkolom met de rijsleutel. De rijsleutel is een unieke id voor een entiteit binnen een bepaalde partitie. Het vormt het tweede deel van de primaire sleutel van een entiteit. De rijsleutel is een tekenreekswaarde die maximaal 1 kB groot kan zijn.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
BATCH_SIZE:
Dit vertegenwoordigt het aantal transacties voor tabelopslag, waarbij het maximum maximaal 100 records kan bevatten. Voor Azure Functions vertegenwoordigt dit de batchgrootte in bytes die per aanroep naar de functie zijn verzonden. De standaardwaarde is 256 kB.
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
MAXIMUM_BATCH_COUNT:
Maximum aantal gebeurtenissen dat per aanroep naar de functie wordt verzonden voor de Azure-functie. De standaardwaarde is 100. Voor SQL Database vertegenwoordigt dit het maximum aantal records dat wordt verzonden met elke transactie voor bulksgewijs invoegen. De standaardwaarde is 10.000.
- Van toepassing op alle op SQL gebaseerde uitvoer
STAGING_AREA: EXTERNAL DATA SOURCE-object naar Blob Storage
Het faseringsgebied voor gegevensopname met hoge doorvoer in Azure Synapse Analytics
- Gereserveerd voor toekomstig gebruik. Is niet van toepassing op Azure SQL Edge.
Zie respectievelijk Azure Stream Analytics - Invoeroverzicht en Overzicht van Uitvoer voor meer informatie over ondersteunde invoer- en uitvoeropties die overeenkomen met het gegevensbrontype.
Voorbeelden
Voorbeeld A: EdgeHub
Type: Invoer of uitvoer.
CREATE EXTERNAL DATA SOURCE MyEdgeHub
WITH (LOCATION = 'edgehub://');
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (FORMAT_TYPE = JSON);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyEdgeHub,
FILE_FORMAT = myFileFormat,
LOCATION = '<mytopicname>',
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Voorbeeld B: Azure SQL Database, Azure SQL Edge, SQL Server
Type: Uitvoer
CREATE DATABASE SCOPED CREDENTIAL SQLCredName
WITH IDENTITY = '<user>',
SECRET = '<password>';
-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = '<my_server_name>.database.windows.net',
CREDENTIAL = SQLCredName
);
--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = ' <sqlserver://<ipaddress>,<port>',
CREDENTIAL = SQLCredName
);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyTargetSQLTable,
LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
--Note: If table is contained in the database, <TableName> should be sufficient
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Voorbeeld C: Kafka
Type: Invoer
CREATE EXTERNAL DATA SOURCE MyKafka_tweets
WITH (
--The location maps to KafkaBootstrapServer
LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
CREDENTIAL = kafkaCredName
);
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (
FORMAT_TYPE = JSON,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
);
CREATE EXTERNAL STREAM Stream_A (
user_id VARCHAR,
tweet VARCHAR
)
WITH (
DATA_SOURCE = MyKafka_tweets,
LOCATION = '<KafkaTopicName>',
FILE_FORMAT = myFileFormat,
INPUT_OPTIONS = 'PARTITIONS: 5'
);