Dela via


Hämta data från Apache Flink

Apache Flink är ett ramverk och en distribuerad bearbetningsmotor för tillståndskänsliga beräkningar över obundna och begränsade dataströmmar.

Flink-anslutningsappen är ett öppen källkod projekt som kan köras på valfritt Flink-kluster. Den implementerar datamottagare för att flytta data från ett Flink-kluster. Med anslutningsappen till Apache Flink kan du skapa snabba och skalbara program som riktar sig till datadrivna scenarier, till exempel maskininlärning (ML), Extract-Transform-Load (ETL) och Log Analytics.

I den här artikeln får du lära dig hur du använder Flink-anslutningsappen för att skicka data från Flink till tabellen. Du skapar en tabell och datamappning, dirigerar Flink för att skicka data till tabellen och validerar sedan resultatet.

Förutsättningar

För Flink-projekt som använder Maven för att hantera beroenden integrerar du Flink Connector Core Sink for Azure Data Explorer genom att lägga till den som ett beroende:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

För projekt som inte använder Maven för att hantera beroenden klonar du lagringsplatsen för Azure Data Explorer Connector för Apache Flink och skapar den lokalt. Med den här metoden kan du manuellt lägga till anslutningsappen till din lokala Maven-lagringsplats med kommandot mvn clean install -DskipTests.

Autentisera

Du kan autentisera från Flink till att använda ett Microsoft Entra-ID-program.

Tjänstens huvudnamn är den identitet som används av anslutningsappen för att skriva data i tabellen i Kusto. Du beviljar senare behörigheter för tjänstens huvudnamn för åtkomst till Kusto-resurser.

  1. Logga in på din Azure-prenumeration via Azure CLI. Autentisera sedan i webbläsaren.

    az login
    
  2. Välj den prenumeration som ska vara värd för huvudkontot. Det här steget behövs när du har flera prenumerationer.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Skapa tjänstens huvudnamn. I det här exemplet kallas my-service-principaltjänstens huvudnamn .

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Från de returnerade JSON-data kopierar du appId, passwordoch tenant för framtida användning.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Du har skapat ditt Microsoft Entra-program och tjänstens huvudnamn.

  1. Ge programmet användarbehörighet för databasen:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Bevilja programmet antingen ingestor- eller administratörsbehörigheter i tabellen. Vilka behörigheter som krävs beror på den valda metoden för dataskrivning. Ingestor-behörigheter räcker för SinkV2, medan WriteAndSink kräver administratörsbehörigheter.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Mer information om auktorisering finns i Kusto rollbaserad åtkomstkontroll.

Så här skriver du data från Flink:

  1. Importera de alternativ som krävs:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Använd ditt program för att autentisera.

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurera mottagarparametrarna, till exempel databas och tabell:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Du kan lägga till fler alternativ enligt beskrivningen i följande tabell:

    Alternativ Description Standardvärde
    IngestionMappingRef Refererar till en befintlig inmatningsmappning.
    FlushImmediately Rensar data omedelbart och kan orsaka prestandaproblem. Den här metoden rekommenderas inte.
    BatchIntervalMs Styr hur ofta data rensas. 30 sekunder
    BatchSize Anger batchstorleken för buffring av poster före tömning. 1 000 poster
    ClientBatchSizeLimit Anger storleken i MB för aggregerade data före inmatning. 300 MB
    PollForIngestionStatus Om sant söker anslutningsappen efter inmatningsstatus efter dataspolning. falskt
    DeliveryGuarantee Avgör semantik för leveransgaranti. Om du vill uppnå exakt en gång semantik använder du WriteAheadSink. AT_LEAST_ONCE
  2. Skriva strömmande data med någon av följande metoder:

    • SinkV2: Det här är ett tillståndslöst alternativ som rensar data vid kontrollpunkten, vilket garanterar minst en gång konsekvens. Vi rekommenderar det här alternativet för datainmatning med hög volym.
    • WriteAheadSink: Den här metoden genererar data till en KustoSink. Den är integrerad med Flinks kontrollpunktssystem och erbjuder exakt en gång garantier. Data lagras i en AbstractStateBackend och bekräftas först när en kontrollpunkt har slutförts.

    I följande exempel används SinkV2. Om du vill använda WriteAheadSink använder du buildWriteAheadSink metoden i stället för build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Den fullständiga koden bör se ut ungefär så här:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Kontrollera att data matas in

När anslutningen har konfigurerats skickas data till tabellen. Du kan kontrollera att data matas in genom att köra en KQL-fråga.

  1. Kör följande fråga för att kontrollera att data matas in i tabellen:

    <MyTable>
    | count
    
  2. Kör följande fråga för att visa data:

    <MyTable>
    | take 100