Partilhar via


CRIAR FLUXO EXTERNO (Transact-SQL)

Importante

O Azure SQL Edge será desativado em 30 de setembro de 2025. Para obter mais informações e opções de migração, consulte o Aviso de aposentadoria.

Nota

O Azure SQL Edge não suporta mais a plataforma ARM64.

O objeto EXTERNAL STREAM tem uma dupla finalidade de fluxo de entrada e saída. Ele pode ser usado como uma entrada para consultar dados de streaming de serviços de ingestão de eventos, como Hubs de Eventos do Azure, Hub IoT do Azure (ou Hub de Borda) ou Kafka, ou pode ser usado como uma saída para especificar onde e como armazenar resultados de uma consulta de streaming.

Um FLUXO EXTERNO também pode ser especificado e criado como saída e entrada para serviços como Hubs de Eventos ou armazenamento de Blob. Isso facilita o encadeamento de cenários em que uma consulta de streaming está persistindo resultados para o fluxo externo como saída e outra leitura de consulta de streaming do mesmo fluxo externo como entrada.

Atualmente, o SQL Edge do Azure dá suporte apenas às seguintes fontes de dados como entradas e saídas de fluxo.

Tipo de origem de dados Entrada Saída Description
Hub do Azure IoT Edge Y Y Fonte de dados para ler e gravar dados de streaming em um hub do Azure IoT Edge. Para obter mais informações, consulte IoT Edge Hub.
Base de Dados SQL N Y Conexão de fonte de dados para gravar dados de streaming no Banco de dados SQL. O banco de dados pode ser um banco de dados local no Azure SQL Edge ou um banco de dados remoto no SQL Server ou no Banco de Dados SQL do Azure.
Kafka Y N Fonte de dados para ler dados de streaming de um tópico de Kafka.

Sintaxe

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

Argumentos

DATA_SOURCE

Para obter mais informações, consulte DATA_SOURCE.

FILE_FORMAT

Para obter mais informações, consulte FILE_FORMAT.

LOCALIZAÇÃO

Especifica o nome dos dados reais ou do local na fonte de dados.

  • Para objetos de fluxo do Hub de Borda ou Kafka, o local especifica o nome do tópico do Hub de Borda ou Kafka para ler ou gravar.
  • Para objetos de fluxo SQL (SQL Server, Banco de Dados SQL do Azure ou Azure SQL Edge), o local especifica o nome da tabela. Se o fluxo for criado no mesmo banco de dados e esquema que a tabela de destino, apenas o nome da tabela será suficiente. Caso contrário, você precisa qualificar totalmente o nome da tabela (<database_name>.<schema_name>.<table_name>).
  • Para o Armazenamento de Blob do Azure, o local do objeto de fluxo refere-se ao padrão de caminho a ser usado dentro do contêiner de blob. Para obter mais informações, consulte Saídas do Azure Stream Analytics.

INPUT_OPTIONS

Especifique opções como pares chave-valor para serviços como Kafka e IoT Edge Hubs, que são entradas para consultas de streaming.

  • DIVISÓRIAS:

    Número de partições definidas para um tópico. O número máximo de partições que podem ser usadas é limitado a 32 (Aplica-se a Kafka Input Streams).

    • CONSUMER_GROUP:

      Os Hubs de Eventos e IoT limitam o número de leitores dentro de um grupo de consumidores (a 5). Deixando este campo vazio, utilizar-se-á o grupo de consumidores «$Default».

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
    • TIME_POLICY:

      Descreve se os eventos devem ser descartados ou ajustados quando eventos atrasados ou eventos fora de ordem ultrapassam seu valor de tolerância.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
    • LATE_EVENT_TOLERANCE:

      O prazo máximo aceitável. O atraso representa a diferença entre o carimbo de data/hora do evento e o relógio do sistema.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      Os eventos podem chegar fora de ordem depois de terem feito a viagem desde a entrada até a consulta de streaming. Esses eventos podem ser aceitos como estão, ou você pode optar por pausar por um período definido para reordená-los.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.

OUTPUT_OPTIONS

Especificar opções como pares chave-valor para serviços suportados que são saídas para consultas de streaming

  • REJECT_POLICY: DROP | RETENTAR

    Espécies as políticas de tratamento de erros de dados quando ocorrem erros de conversão de dados.

    • Aplica-se a todas as saídas suportadas.
  • MINIMUM_ROWS:

    Linhas mínimas necessárias por lote gravado em uma saída. Para Parquet, cada lote cria um novo arquivo.

    • Aplica-se a todas as saídas suportadas.
  • MAXIMUM_TIME:

    Tempo máximo de espera em minutos por lote. Após esse tempo, o lote será gravado na saída, mesmo que o requisito mínimo de linhas não seja atendido.

    • Aplica-se a todas as saídas suportadas.
  • PARTITION_KEY_COLUMN:

    A coluna que é usada para a chave de partição.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • PROPERTY_COLUMNS:

    Uma lista separada por vírgulas dos nomes das colunas de saída anexadas às mensagens como propriedades personalizadas, se fornecidas.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • SYSTEM_PROPERTY_COLUMNS:

    Uma coleção formatada em JSON de pares nome/valor de nomes de propriedade do sistema e colunas de saída a serem preenchidas em mensagens do Service Bus. Por exemplo, { "MessageId": "column1", "PartitionKey": "column2" }.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • PARTITION_KEY:

    O nome da coluna de saída que contém a chave de partição. A chave de partição é um identificador exclusivo para a partição dentro de uma determinada tabela que forma a primeira parte da chave primária de uma entidade. É um valor de cadeia de caracteres que pode ter até 1 KB de tamanho.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • ROW_KEY:

    O nome da coluna de saída que contém a chave de linha. A chave de linha é um identificador exclusivo para uma entidade dentro de uma determinada partição. Forma a segunda parte da chave primária de uma entidade. A chave de linha é um valor de cadeia de caracteres que pode ter até 1 KB de tamanho.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • BATCH_SIZE:

    Isso representa o número de transações para armazenamento de tabelas, onde o máximo pode ir até 100 registros. Para o Azure Functions, isso representa o tamanho do lote em bytes enviados para a função por chamada - o padrão é 256 kB.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • MAXIMUM_BATCH_COUNT:

    Número máximo de eventos enviados para a função por chamada para a função do Azure - o padrão é 100. Para o Banco de dados SQL, isso representa o número máximo de registros enviados com cada transação de inserção em massa - o padrão é 10.000.

    • Aplica-se a todas as saídas baseadas em SQL
  • STAGING_AREA: Objeto EXTERNAL DATA SOURCE para armazenamento de Blob

    A área de preparo para ingestão de dados de alta taxa de transferência no Azure Synapse Analytics

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.

Para obter mais informações sobre as opções de entrada e saída com suporte correspondentes ao tipo de fonte de dados, consulte Azure Stream Analytics - Visão geral de entrada e Azure Stream Analytics - Visão geral de saídas, respectivamente.

Exemplos

Exemplo A: EdgeHub

Tipo: Entrada ou Saída.

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Exemplo B: Banco de Dados SQL do Azure, Azure SQL Edge, SQL Server

Tipo: Saída

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Exemplo C: Kafka

Tipo: Entrada

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );