CREATE EXTERNAL STREAM (Transact-SQL)
Внимание
Azure SQL Edge будет прекращена 30 сентября 2025 г. Дополнительные сведения и параметры миграции см. в уведомлении о выходе на пенсию.
Примечание.
Azure SQL Edge больше не поддерживает платформу ARM64.
Объект EXTERNAL STREAM имеет два назначения: для входного и выходного потока. Его можно использовать в качестве входных данных для запроса потоковых данных из служб приема событий, таких как Центры событий Azure, Центр Интернета вещей Azure (или Пограничный концентратор) или Kafka или его можно использовать в качестве выходных данных, чтобы указать, где и как хранить результаты из потокового запроса.
Внешний ПОТОК также можно указать и создать как выходные данные, так и входные данные для таких служб, как Центры событий или хранилище BLOB-объектов. Это позволяет создавать сценарии с цепочкой, в которых запрос потоковой передачи сохраняет результаты во внешнем потоке в качестве выходных данных, а другой потоковый запрос считывает результаты из того же внешнего потока в качестве входных данных.
В настоящее время SQL Azure для пограничных вычислений поддерживает только следующие источники данных в качестве входных потоков и потоков вывода.
Тип источника данных | Входные данные | Выходные данные | Description |
---|---|---|---|
Центр Azure IoT Edge | Y | Y | Источник данных для чтения и записи потоковых данных в центр Azure IoT Edge. Дополнительные сведения см. в разделе Центр IoT Edge. |
База данных SQL | N | Y | Соединение с источником данных для записи потоковых данных в Базу данных SQL. Базой данных может быть локальная база данных в SQL Azure для пограничных вычислений, удаленная база данных в SQL Server или База данных SQL Azure. |
Kafka | Y | N | Источник данных для чтения потоковых данных из раздела Kafka. |
Синтаксис
CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH ( <with_options> )
<column_definition> ::=
column_name <column_data_type>
<data_type> ::=
[ type_schema_name . ] type_name
[ ( precision [ , scale ] | max ) ]
<with_options> ::=
DATA_SOURCE = data_source_name ,
LOCATION = location_name ,
[ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
[ <optional_input_options> ] ,
[ <optional_output_options> ] ,
TAGS = <tag_column_value>
<optional_input_options> ::=
INPUT_OPTIONS = ' [ <input_options_data> ] '
<Input_option_data> ::=
<input_option_values> [ , <input_option_values> ]
<input_option_values> ::=
PARTITIONS: [ number_of_partitions ]
| CONSUMER_GROUP: [ consumer_group_name ]
| TIME_POLICY: [ time_policy ]
| LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
| OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]
<optional_output_options> ::=
OUTPUT_OPTIONS = ' [ <output_option_data> ] '
<output_option_data> ::=
<output_option_values> [ , <output_option_values> ]
<output_option_values> ::=
REJECT_POLICY: [ reject_policy ]
| MINIMUM_ROWS: [ row_value ]
| MAXIMUM_TIME: [ time_value_minutes ]
| PARTITION_KEY_COLUMN: [ partition_key_column_name ]
| PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| PARTITION_KEY: [ partition_key_name ]
| ROW_KEY: [ row_key_name ]
| BATCH_SIZE: [ batch_size_value ]
| MAXIMUM_BATCH_COUNT: [ batch_value ]
| STAGING_AREA: [ blob_data_source ]
<tag_column_value> ::= -- Reserved for Future Usage
);
Аргументы
DATA_SOURCE
Дополнительные сведения см. в DATA_SOURCE.
FILE_FORMAT
Дополнительные сведения см. в FILE_FORMAT.
LOCATION
Задает имя для фактических данных или расположения в источнике данных.
- Для центра Edge и объектов потока Kafka аргумент LOCATION указывает имя центра Edge или раздел Kafka для чтения или записи.
- Для объектов потока SQL (SQL Server, База данных SQL Azure или Azure SQL Edge) расположение указывает имя таблицы. Если поток создается в той же базе данных и схеме, что и целевая таблица, достаточно имени таблицы. В противном случае необходимо полностью указать имя таблицы (
<database_name>.<schema_name>.<table_name>
). - Для объекта потока Хранилища BLOB-объектов Azure аргумент LOCATION ссылается на шаблон пути, используемый внутри контейнера BLOB-объектов. Дополнительные сведения см. в разделе "Выходные данные" из Azure Stream Analytics.
INPUT_OPTIONS
Укажите параметры в качестве пар "ключ-значение" для таких служб, как Kafka и Центры IoT Edge, которые являются входными данными для потоковых запросов.
РАЗДЕЛОВ:
Количество секций, определенных для раздела. Максимальное количество секций, которые можно использовать, ограничено 32 (применяется к потокам ввода Kafka).
CONSUMER_GROUP.
Концентратор событий и центры Интернета вещей ограничивают число читателей в одной группе потребителей (до пяти). Если оставить это поле пустым, будет использоваться группа потребителей $Default.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
TIME_POLICY.
Описывает, следует ли удалять события или настраивать время продолжительности события, если поздние или неупорядоченные события передают свое значение допуска.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
LATE_EVENT_TOLERANCE.
Максимально допустимая задержка времени. Задержка представляет разницу между меткой времени события и системными часами.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
OUT_OF_ORDER_EVENT_TOLERANCE.
После того как события перешли из входных данных в запрос потоковой передачи, они могут поступать не по порядку. Эти события можно принять как есть или приостановить процесс на определенное время, чтобы изменить их порядок.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
OUTPUT_OPTIONS
Указывает параметры в качестве пар "ключ — значение" для поддерживаемых служб, которые являются выходными данными для запросов потоковой передачи.
REJECT_POLICY: DROP | СНОВА ПРОБОВАТЬ
Вид политик обработки ошибок данных при возникновении ошибок преобразования данных.
- Применяется ко всем поддерживаемым выходным данным.
MINIMUM_ROWS.
Минимальное количество строк, необходимых для каждого пакета, записанного в выходные данные. Для Parquet для каждого пакета создается новый файл.
- Применяется ко всем поддерживаемым выходным данным.
MAXIMUM_TIME.
Максимальное время ожидания на пакет в минутах. После этого пакет будет записан в выходные данные, даже если минимальные требования к строкам не выполнены.
- Применяется ко всем поддерживаемым выходным данным.
PARTITION_KEY_COLUMN.
Столбец, который используется для ключа секции.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
PROPERTY_COLUMNS.
Разделенный запятыми список имен выходных столбцов, присоединенных к сообщениям в качестве настраиваемых свойств, если это указано.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
SYSTEM_PROPERTY_COLUMNS.
Коллекция пар "имя — значение" для имен системных свойств и выходных столбцов, которая будет заполнена на основе сообщений служебной шины, в формате JSON. Например,
{ "MessageId": "column1", "PartitionKey": "column2" }
.- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
PARTITION_KEY.
Имя выходного столбца, содержащего ключ раздела. Ключ раздела — это уникальный идентификатор раздела в пределах конкретной таблицы, являющийся первой частью первичного ключа сущности. Это строковое значение, которое может составлять до 1 КБ в размере.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
ROW_KEY.
Имя выходного столбца, содержащего ключ строки. Ключ строки — это уникальный идентификатор сущности внутри конкретного раздела. Он является второй частью первичного ключа сущности. Ключ строки — это строковое значение размером до 1 КБ.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
BATCH_SIZE.
Представляет количество транзакций для хранилища таблиц, в которых максимальное число строк может доходить до 100. Для Функции Azure представляет размер пакета в байтах, отправляемого в функцию за один вызов. Значение по умолчанию — 256 КБ.
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
MAXIMUM_BATCH_COUNT.
Максимальное количество событий, отправленных в функцию, на вызов для Функции Azure. Значение по умолчанию — 100. Для Базы данных SQL представляет максимальное число записей, отправленных с каждой транзакцией массовой вставки. По умолчанию — 10 000.
- Применяется ко всем выходным данным на основе SQL.
STAGING_AREA: объект EXTERNAL DATA SOURCE в хранилище BLOB-объектов
Промежуточная область приема данных с высокой пропускной способностью в Azure Synapse Analytics
- Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
Дополнительные сведения о поддерживаемых параметрах входных и выходных данных, соответствующих типу источника данных, см. в статье "Обзор входных данных" и "Общие сведения о выходных данных Azure Stream Analytics".
Примеры
Пример A: EdgeHub
Тип: входные или выходные данные.
CREATE EXTERNAL DATA SOURCE MyEdgeHub
WITH (LOCATION = 'edgehub://');
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (FORMAT_TYPE = JSON);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyEdgeHub,
FILE_FORMAT = myFileFormat,
LOCATION = '<mytopicname>',
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Пример B: База данных SQL Azure, Azure SQL Edge, SQL Server
Тип: выходные данные
CREATE DATABASE SCOPED CREDENTIAL SQLCredName
WITH IDENTITY = '<user>',
SECRET = '<password>';
-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = '<my_server_name>.database.windows.net',
CREDENTIAL = SQLCredName
);
--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = ' <sqlserver://<ipaddress>,<port>',
CREDENTIAL = SQLCredName
);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyTargetSQLTable,
LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
--Note: If table is contained in the database, <TableName> should be sufficient
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Пример C: Kafka
Тип: входные данные
CREATE EXTERNAL DATA SOURCE MyKafka_tweets
WITH (
--The location maps to KafkaBootstrapServer
LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
CREDENTIAL = kafkaCredName
);
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (
FORMAT_TYPE = JSON,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
);
CREATE EXTERNAL STREAM Stream_A (
user_id VARCHAR,
tweet VARCHAR
)
WITH (
DATA_SOURCE = MyKafka_tweets,
LOCATION = '<KafkaTopicName>',
FILE_FORMAT = myFileFormat,
INPUT_OPTIONS = 'PARTITIONS: 5'
);