Del via


Hent data fra Apache Flink

Apache Flink er et rammeverk og distribuert behandlingsmotor for tilstandsfulle beregninger over ubundne og avgrensede datastrømmer.

Flink-koblingen er et åpen kilde prosjekt som kan kjøre på en hvilken som helst Flink-klynge. Den implementerer datavask for flytting av data fra en Flink-klynge. Ved hjelp av koblingen til Apache Flink kan du bygge raske og skalerbare programmer rettet mot datadrevne scenarioer, for eksempel maskinlæring (ML), Extract-Transform-Load (ETL) og Log Analytics.

I denne artikkelen lærer du hvordan du bruker Flink-koblingen til å sende data fra Flink til tabellen. Du oppretter en tabell- og datatilordning, dirigerer Flink til å sende data inn i tabellen, og deretter validerer du resultatene.

Forutsetning

For Flink-prosjekter som bruker Maven til å administrere avhengigheter, integrerer du Flink Connector Core Sink For Azure Data Explorer ved å legge den til som en avhengighet:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

For prosjekter som ikke bruker Maven til å administrere avhengigheter, kloner du repositoriet for Azure Data Explorer Connector for Apache Flink og bygger det lokalt. Med denne fremgangsmåten kan du legge til koblingen manuelt i det lokale Maven-repositoriet ved hjelp av kommandoen mvn clean install -DskipTests.

Godkjenn

Du kan godkjenne fra Flink til å bruke et Microsoft Entra ID-program.

Denne tjenestekontohaveren vil være identiteten som brukes av koblingen til å skrive data i tabellen i Kusto. Du vil senere gi tillatelser for denne tjenestekontohaveren for å få tilgang til Kusto-ressurser.

  1. Logg på Azure-abonnementet via Azure CLI. Deretter godkjenner du i nettleseren.

    az login
    
  2. Velg abonnementet som vert for hovedstolen. Dette trinnet er nødvendig når du har flere abonnementer.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Opprett tjenestekontohaveren. I dette eksemplet kalles my-service-principaltjenestekontohaveren .

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopier , passwordog tenant for fremtidig bruk, appIdfra de returnerte JSON-dataene.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Du har opprettet Microsoft Entra-programmet og tjenestekontohaveren.

  1. Gi programbrukertillatelsene i databasen:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Gi programmet enten ingestor- eller administratortillatelser i tabellen. De nødvendige tillatelsene avhenger av den valgte metoden for dataskriving. Ingestor-tillatelser er tilstrekkelige for SinkV2, mens WriteAndSink krever administratortillatelser.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Hvis du vil ha mer informasjon om autorisasjon, kan du se Rollebasert tilgangskontroll for Kusto.

Slik skriver du data fra Flink:

  1. Importer de nødvendige alternativene:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Bruk programmet til å godkjenne.

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurer synkeparameterne, for eksempel database og tabell:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Du kan legge til flere alternativer, som beskrevet i tabellen nedenfor:

    Alternativ Bekrivelse Standardverdi
    InntakMappingRef Refererer til en eksisterende inntakstilordning.
    FlushImmediately Tømmer data umiddelbart, og kan føre til ytelsesproblemer. Denne metoden anbefales ikke.
    BatchIntervalMs Styrer hvor ofte data tømmes. 30 sekunder
    BatchSize Angir partistørrelsen for bufring av poster før tømming. 1 000 oppføringer
    ClientBatchSizeLimit Angir størrelsen i MB av aggregerte data før inntak. 300 MB
    PollForIngestionStatus Hvis sann, avspørrer koblingen for inntaksstatus etter dataspyling. usann
    DeliveryGuarantee Bestemmer semantikk for leveringsgaranti. Hvis du vil oppnå nøyaktig én gang semantikk, kan du bruke WriteAheadSink. AT_LEAST_ONCE
  2. Skriv strømming av data med én av følgende metoder:

    • SinkV2: Dette er et tilstandsløst alternativ som tømmer data på kontrollpunkt, noe som sikrer minst én gang konsekvens. Vi anbefaler dette alternativet for datainntak med høyt volum.
    • WriteAheadSink: Denne metoden sender data til en KustoSink. Den er integrert med Flinks kontrollpunktsystem og tilbyr nøyaktig én gang garantier. Data lagres i en AbstractStateBackend og utføres først etter at et kontrollpunkt er fullført.

    Følgende eksempel bruker SinkV2. Hvis du vil bruke WriteAheadSink, bruker buildWriteAheadSink du metoden i stedet buildfor :

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Den fullstendige koden skal se omtrent slik ut:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Kontroller at dataene er inntatt

Når tilkoblingen er konfigurert, sendes data til tabellen. Du kan bekrefte at dataene inntar ved å kjøre en KQL-spørring.

  1. Kjør følgende spørring for å bekrefte at dataene er inntatt i tabellen:

    <MyTable>
    | count
    
  2. Kjør følgende spørring for å vise dataene:

    <MyTable>
    | take 100