Compartilhar via


Integração do Azure Data Explorer e do Apache Flink®.

O Azure Data Explorer é uma plataforma de análise de Big Data totalmente gerenciada e de alto desempenho que facilita a análise de grandes volumes de dados quase em tempo real.

O ADX ajuda os usuários a analisar grandes volumes de dados de aplicativos de streaming, sites, dispositivos IoT etc. A integração do Apache Flink com o ADX ajuda você a processar dados em tempo real e analisá-los no ADX.

Pré-requisitos

  1. Criar o cluster do Flink.

  2. Crie o ADX com banco de dados e tabela, conforme necessário.

  3. Adicione permissões de ingestão para a identidade gerenciada em Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Execute um programa de exemplo definindo o URI (Uniform Resource Identifier) do cluster Kusto, o banco de dados e a identidade gerenciada usados, além da tabela que receberá as gravações.

  5. Clone o projeto flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Crie a tabela no ADX usando o seguinte comando:

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Atualize o arquivo FlinkKustoSinkSample.java com o URI do cluster Kusto, o banco de dados e a identidade gerenciada corretos que são usados.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Posteriormente, crie o projeto usando “mvn clean package”.

  8. Localize o arquivo JAR “samples-java-1.0-SNAPSHOT-shaded.jar” na pasta “sample-java/target”, carregue-o na interface do usuário do Flink e envie o trabalho.

  9. Consulte a tabela Kusto para verificar a saída.

    screenshot shows query the Kusto table to verify the output.

    Não há atrasos na gravação dos dados do Flink na tabela Kusto.

Referência