Condividi tramite


Inserire dati con Apache Flink in Azure Esplora dati

Apache Flink è un potente framework e un motore di elaborazione distribuito per eseguire i calcoli con stato su flussi di dati non associati e delimitati.

Il connettore Flink è un progetto open-source che può essere eseguito in qualsiasi cluster Flink. Implementa il sink di dati per lo spostamento di dati da un cluster Flink. Usando il connettore ad Apache Spark è possibile compilare applicazioni scalabili e veloci destinate a scenari basati sui dati, ad esempio Machine Learning (ML), Extract-Transform-Load (ETL) e Log Analytics.

Questo articolo mostra come usare il connettore Flink per inviare dati da Flink alla tabella. Inizialmente si creerà una tabella e il mapping dei dati, quindi si istruirà Flink a inviare i dati nella tabella e convalidare i risultati.

Prerequisiti

Per i progetti Flink che usano Maven per gestire le dipendenze, integrare il Flink Connector Core Sink per Esplora dati di Azure aggiungendolo come dipendenza:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Per i progetti che non usano Maven per gestire le dipendenze, clonare il repository per Azure Data Explorer Connector per Apache Flink e compilarlo in locale. Questo approccio consente di aggiungere manualmente il connettore al repository Maven locale usando il comando mvn clean install -DskipTests.

È possibile eseguire l'autenticazione da Flink a usando un'applicazione Microsoft Entra ID o un'identità gestita.

Questa entità servizio sarà l'identità usata dal connettore per scrivere dati nella tabella in Kusto. Successivamente si concedono le autorizzazioni a questa entità servizio per accedere alle risorse Kusto.

  1. Accedere alla sottoscrizione di Azure usando l'interfaccia della riga di comando di Azure. Eseguire quindi l'autenticazione nel browser.

    az login
    
  2. Scegliere la sottoscrizione per ospitare l'entità di sicurezza. Questo passaggio è necessario quando si hanno più sottoscrizioni.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Creare l'entità servizio. In questo esempio l'entità servizio viene chiamata my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dai dati JSON restituiti copiare appId, password e tenant per un uso futuro.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

L'applicazione Microsoft Entra e l'entità servizio sono state create.

  1. Concedere all'utente dell'applicazione le autorizzazioni per il database:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Concedere all'applicazione autorizzazioni di ingestor o admin per la tabella. Le autorizzazioni necessarie dipendono dal metodo di scrittura dei dati scelto. Le autorizzazioni di ingestor sono sufficienti per SinkV2, mentre WriteAndSink richiede autorizzazioni di amministratore.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Per maggiori informazioni, vedere Autorizzazione Kusto con il controllo degli accessi in base al ruolo.

Per scrivere dati da Flink:

  1. Selezionare le opzioni necessarie:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Usare l'applicazione o l'identità gestita per l'autenticazione.

    Per l'autenticazione dell'applicazione:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Per l'autenticazione dell'identità gestita:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configurare i parametri sink, ad esempio database e tabella:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    È possibile scegliere tra le opzioni descritte nella tabella riportata di seguito:

    Opzione Descrizione Valore predefinito
    IngestionMappingRef Fa riferimento a un mapping di inserimento esistente.
    FlushImmediately Scarica immediatamente i dati e può causare problemi di prestazioni. Non è consigliabile usare questo metodo.
    BatchIntervalMs Controlla la frequenza con cui vengono scaricati i dati. 30 secondi
    BatchSize Imposta le dimensioni del batch per il buffering dei record prima dello scaricamento. 1.000 record
    ClientBatchSizeLimit Specifica le dimensioni in MB di dati aggregati prima dell'inserimento. 300 MB
    PollForIngestionStatus Se vero, il connettore esegue il polling dello stato di inserimento dopo lo scaricamento dei dati. false
    DeliveryGuarantee Determina la semantica della garanzia di recapito. Per ottenere esattamente una semantica, usare WriteAheadSink. AT_LEAST_ONCE
  2. Scrivere dati di streaming con uno dei metodi seguenti:

    • SinkV2: si tratta di un'opzione senza stato che scarica i dati nel checkpoint, assicurando almeno una volta la coerenza. È consigliabile usare questa opzione per l'inserimento di dati con volumi elevati.
    • WriteAheadSink: questo metodo genera dati in un KustoSink. È integrato con il sistema di checkpoint Flink e offre esattamente una volta garanzie. I dati vengono archiviati in un oggetto AbstractStateBackend ed il commit viene eseguito solo dopo il completamento di un checkpoint.

    Nell'esempio seguente viene utilizzato SinkV2. Per usare WriteAheadSink, usare il buildWriteAheadSink metodo anziché build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Il codice completo dovrebbe apparire come segue:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Verificare che i dati siano inseriti

Dopo aver configurato la connessione, i dati vengono inviati alla tabella. È possibile verificare che i dati vengano inseriti eseguendo una query KQL.

  1. Eseguire la query seguente per verificare che i dati vengano inseriti nella tabella:

    <MyTable>
    | count
    
  2. Eseguire il seguente codice per visualizzare i dati:

    <MyTable>
    | take 100