Příjem dat pomocí Apache Flinku do Azure Data Exploreru
Apache Flink je architektura a distribuovaný modul pro zpracování stavových výpočtů přes nevázané a vázané datové proudy.
Konektor Flink je opensourcový projekt , který se dá spustit v jakémkoli clusteru Flink. Implementuje datovou jímku pro přesun dat z clusteru Flink. Pomocí konektoru pro Apache Flink můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty, například strojové učení (ML), extrakce a transformace načítání (ETL) a Log Analytics.
V tomto článku se dozvíte, jak pomocí konektoru Flink odesílat data z Flinku do tabulky. Vytvoříte tabulku a mapování dat, nasměrujete Flink na odeslání dat do tabulky a pak ověříte výsledky.
Požadavky
- Cluster a databáze Azure Data Exploreru. Vytvoření clusteru a databáze nebo databáze KQL v sadě Microsoft Fabric v reálném čase
- Cílová tabulka v databázi. Viz Vytvoření tabulky v Azure Data Exploreru nebo Vytvoření tabulky v nástroji V reálném čase
- Cluster Apache Flink. Vytvořte cluster.
- Maven 3.x
Získání konektoru Flink
Pro projekty Flink, které ke správě závislostí používají Maven, integrujte jímku jádra konektoru Flink pro Azure Data Explorer přidáním jako závislosti:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
U projektů, které ke správě závislostí nepoužívají Maven, naklonujte úložiště konektoru Azure Data Exploreru pro Apache Flink a sestavte ho místně. Tento přístup umožňuje ručně přidat konektor do místního úložiště Maven pomocí příkazu mvn clean install -DskipTests
.
Z Flinku se můžete ověřit pomocí aplikace Microsoft Entra ID nebo spravované identity.
Tento instanční objekt bude identita, kterou konektor používá k zápisu dat do tabulky v Kusto. Později udělíte oprávnění pro tento instanční objekt pro přístup k prostředkům Kusto.
Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.
az login
Zvolte předplatné, které má být hostitelem objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z vrácených dat JSON zkopírujte
appId
password
tenant
a pro budoucí použití.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Vytvořili jste aplikaci Microsoft Entra a instanční objekt.
Udělte uživateli aplikace oprávnění k databázi:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Udělte aplikaci oprávnění správce nebo ingestoru v tabulce. Požadovaná oprávnění závisí na zvolené metodě zápisu dat. Oprávnění ingestoru jsou dostatečná pro SinkV2, zatímco WriteAndSink vyžaduje oprávnění správce.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Další informace o autorizaci najdete v tématu Řízení přístupu na základě role Kusto.
Zápis dat z Flinku
Zápis dat z Flinku:
Import požadovaných možností:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
K ověření použijte svou aplikaci nebo spravovanou identitu.
Pro ověřování aplikací:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Pro ověřování spravované identity:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Nakonfigurujte parametry jímky, jako je databáze a tabulka:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Můžete přidat další možnosti, jak je popsáno v následující tabulce:
Možnost Popis Výchozí hodnota IngestionMappingRef Odkazuje na existující mapování příjmu dat. FlushImmediately Vyprázdní data okamžitě a může způsobit problémy s výkonem. Tato metoda se nedoporučuje. BatchIntervalMs Určuje, jak často se data vyprázdní. 30 sekund BatchSize Nastaví velikost dávky pro ukládání záznamů do vyrovnávací paměti před vyprázdněním. 1 000 záznamů ClientBatchSizeLimit Určuje velikost v MB agregovaných dat před příjmem dat. 300 MB PollForIngestionStatus Pokud je pravda, konektor se po vyprázdnění dat dotazuje na stav příjmu dat. false (nepravda) DeliveryGuarantee Určuje sémantiku záruky doručení. K dosažení přesně jednou sémantiky použijte WriteAheadSink. AT_LEAST_ONCE Zapisujte streamovaná data jedním z následujících způsobů:
- SinkV2: Jedná se o bezstavovou možnost, která vyprázdní data na kontrolním bodu a zajišťuje alespoň jednou konzistenci. Tuto možnost doporučujeme pro příjem dat s velkým objemem dat.
- WriteAheadSink: Tato metoda generuje data do KustoSinku. Je integrovaná se systémem kontrolních bodů Flink a nabízí přesně jednou záruky. Data jsou uložena v AbstraktníStateBackend a potvrzena až po dokončení kontrolního bodu.
Následující příklad používá SinkV2. Chcete-li použít WriteAheadSink, použijte metodu
buildWriteAheadSink
místobuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Celý kód by měl vypadat přibližně takto:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Ověření ingestování dat
Po nakonfigurování připojení se data odesílají do tabulky. Spuštěním dotazu KQL můžete ověřit, že se data ingestují.
Spuštěním následujícího dotazu ověřte, že se data ingestují do tabulky:
<MyTable> | count
Spuštěním následujícího dotazu zobrazte data:
<MyTable> | take 100