Sdílet prostřednictvím


Příjem dat pomocí Apache Flinku do Azure Data Exploreru

Apache Flink je architektura a distribuovaný modul pro zpracování stavových výpočtů přes nevázané a vázané datové proudy.

Konektor Flink je opensourcový projekt , který se dá spustit v jakémkoli clusteru Flink. Implementuje datovou jímku pro přesun dat z clusteru Flink. Pomocí konektoru pro Apache Flink můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty, například strojové učení (ML), extrakce a transformace načítání (ETL) a Log Analytics.

V tomto článku se dozvíte, jak pomocí konektoru Flink odesílat data z Flinku do tabulky. Vytvoříte tabulku a mapování dat, nasměrujete Flink na odeslání dat do tabulky a pak ověříte výsledky.

Požadavky

Pro projekty Flink, které ke správě závislostí používají Maven, integrujte jímku jádra konektoru Flink pro Azure Data Explorer přidáním jako závislosti:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

U projektů, které ke správě závislostí nepoužívají Maven, naklonujte úložiště konektoru Azure Data Exploreru pro Apache Flink a sestavte ho místně. Tento přístup umožňuje ručně přidat konektor do místního úložiště Maven pomocí příkazu mvn clean install -DskipTests.

Z Flinku se můžete ověřit pomocí aplikace Microsoft Entra ID nebo spravované identity.

Tento instanční objekt bude identita, kterou konektor používá k zápisu dat do tabulky v Kusto. Později udělíte oprávnění pro tento instanční objekt pro přístup k prostředkům Kusto.

  1. Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.

    az login
    
  2. Zvolte předplatné, které má být hostitelem objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z vrácených dat JSON zkopírujte appIdpasswordtenant a pro budoucí použití.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Vytvořili jste aplikaci Microsoft Entra a instanční objekt.

  1. Udělte uživateli aplikace oprávnění k databázi:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Udělte aplikaci oprávnění správce nebo ingestoru v tabulce. Požadovaná oprávnění závisí na zvolené metodě zápisu dat. Oprávnění ingestoru jsou dostatečná pro SinkV2, zatímco WriteAndSink vyžaduje oprávnění správce.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Další informace o autorizaci najdete v tématu Řízení přístupu na základě role Kusto.

Zápis dat z Flinku:

  1. Import požadovaných možností:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. K ověření použijte svou aplikaci nebo spravovanou identitu.

    Pro ověřování aplikací:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Pro ověřování spravované identity:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Nakonfigurujte parametry jímky, jako je databáze a tabulka:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Můžete přidat další možnosti, jak je popsáno v následující tabulce:

    Možnost Popis Výchozí hodnota
    IngestionMappingRef Odkazuje na existující mapování příjmu dat.
    FlushImmediately Vyprázdní data okamžitě a může způsobit problémy s výkonem. Tato metoda se nedoporučuje.
    BatchIntervalMs Určuje, jak často se data vyprázdní. 30 sekund
    BatchSize Nastaví velikost dávky pro ukládání záznamů do vyrovnávací paměti před vyprázdněním. 1 000 záznamů
    ClientBatchSizeLimit Určuje velikost v MB agregovaných dat před příjmem dat. 300 MB
    PollForIngestionStatus Pokud je pravda, konektor se po vyprázdnění dat dotazuje na stav příjmu dat. false (nepravda)
    DeliveryGuarantee Určuje sémantiku záruky doručení. K dosažení přesně jednou sémantiky použijte WriteAheadSink. AT_LEAST_ONCE
  2. Zapisujte streamovaná data jedním z následujících způsobů:

    • SinkV2: Jedná se o bezstavovou možnost, která vyprázdní data na kontrolním bodu a zajišťuje alespoň jednou konzistenci. Tuto možnost doporučujeme pro příjem dat s velkým objemem dat.
    • WriteAheadSink: Tato metoda generuje data do KustoSinku. Je integrovaná se systémem kontrolních bodů Flink a nabízí přesně jednou záruky. Data jsou uložena v AbstraktníStateBackend a potvrzena až po dokončení kontrolního bodu.

    Následující příklad používá SinkV2. Chcete-li použít WriteAheadSink, použijte metodu buildWriteAheadSink místo build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Celý kód by měl vypadat přibližně takto:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Ověření ingestování dat

Po nakonfigurování připojení se data odesílají do tabulky. Spuštěním dotazu KQL můžete ověřit, že se data ingestují.

  1. Spuštěním následujícího dotazu ověřte, že se data ingestují do tabulky:

    <MyTable>
    | count
    
  2. Spuštěním následujícího dotazu zobrazte data:

    <MyTable>
    | take 100