Pozyskiwanie danych za pomocą funkcji Apache Flink do usługi Azure Data Explorer
Apache Flink to platforma i aparat przetwarzania rozproszonego na potrzeby obliczeń stanowych za pośrednictwem niezwiązanych i ograniczonych strumieni danych.
Łącznik Flink to projekt typu open source, który można uruchomić w dowolnym klastrze Flink. Implementuje ujście danych do przenoszenia danych z klastra Flink. Za pomocą łącznika do platformy Apache Flink można tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych, na przykład uczenia maszynowego (ML), wyodrębniania i przekształcania obciążenia (ETL) i usługi Log Analytics.
Z tego artykułu dowiesz się, jak używać łącznika Flink do wysyłania danych z linku Flink do tabeli. Utworzysz tabelę i mapowanie danych, przekieruj Flink, aby wysłać dane do tabeli, a następnie zweryfikujesz wyniki.
Wymagania wstępne
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danych lub bazę danych KQL w funkcji analizy w czasie rzeczywistym w usłudze Microsoft Fabric.
- Tabela docelowa w bazie danych. Zobacz Tworzenie tabeli w usłudze Azure Data Explorer lub Tworzenie tabeli w funkcji analizy w czasie rzeczywistym
- Klaster Apache Flink. Utwórz klaster.
- Maven 3.x
Pobieranie łącznika Flink
W przypadku projektów Flink korzystających z narzędzia Maven do zarządzania zależnościami zintegruj ujście podstawowego łącznika Flink dla usługi Azure Data Explorer , dodając ją jako zależność:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
W przypadku projektów, które nie używają narzędzia Maven do zarządzania zależnościami, sklonuj repozytorium łącznika usługi Azure Data Explorer dla platformy Apache Flink i skompiluj je lokalnie. Takie podejście umożliwia ręczne dodawanie łącznika do lokalnego repozytorium Maven przy użyciu polecenia mvn clean install -DskipTests
.
Możesz uwierzytelnić się za pomocą linku Flink do przy użyciu aplikacji Microsoft Entra ID lub tożsamości zarządzanej.
Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.
Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.
az login
Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z zwróconych danych JSON skopiuj wartości
appId
,password
itenant
do użycia w przyszłości.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Utworzono aplikację Microsoft Entra i jednostkę usługi.
Przyznaj użytkownikowi aplikacji uprawnienia do bazy danych:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Przyznaj aplikacji uprawnienia ingestor lub administratora w tabeli. Wymagane uprawnienia zależą od wybranej metody zapisywania danych. Uprawnienia ingestora są wystarczające dla sinkV2, podczas gdy writeAndSink wymaga uprawnień administratora.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Aby uzyskać więcej informacji na temat autoryzacji, zobacz Kusto role-based access control (Kontrola dostępu oparta na rolach w usłudze Kusto).
Zapisywanie danych z linku Flink
Aby zapisać dane z Flink:
Zaimportuj wymagane opcje:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Użyj swojej aplikacji lub tożsamości zarządzanej do uwierzytelniania.
W przypadku uwierzytelniania aplikacji:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
W przypadku uwierzytelniania tożsamości zarządzanej:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Skonfiguruj parametry ujścia, takie jak baza danych i tabela:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Więcej opcji można dodać zgodnie z opisem w poniższej tabeli:
Opcja opis Wartość domyślna IngestionMappingRef Odwołuje się do istniejącego mapowania pozyskiwania. FlushImmediately Natychmiast opróżnia dane i może powodować problemy z wydajnością. Ta metoda nie jest zalecana. BatchIntervalMs Określa częstotliwość opróżniania danych. 30 sekund BatchSize Ustawia rozmiar partii dla buforowania rekordów przed opróżnianiem. 1 000 rekordów ClientBatchSizeLimit Określa rozmiar w MB zagregowanych danych przed pozyskiwaniem. 300 MB PollForIngestionStatus Jeśli to prawda, łącznik sonduje stan pozyskiwania po opróżnieniu danych. fałsz DeliveryGuarantee Określa semantyka gwarancji dostarczania. Aby osiągnąć dokładnie raz semantyka, użyj metody WriteAheadSink. AT_LEAST_ONCE Zapisuj dane przesyłane strumieniowo przy użyciu jednej z następujących metod:
- SinkV2: jest to opcja bezstanowa, która opróżnia dane w punkcie kontrolnym, zapewniając co najmniej raz spójność. Zalecamy tę opcję w przypadku pozyskiwania dużych ilości danych.
- WriteAheadSink: ta metoda emituje dane do aplikacji KustoSink. Jest zintegrowany z systemem tworzenia punktów kontrolnych Flink i oferuje dokładnie jednokrotne gwarancje. Dane są przechowywane w obiekcie AbstractStateBackend i zatwierdzane tylko po zakończeniu punktu kontrolnego.
W poniższym przykładzie użyto kodu SinkV2. Aby użyć metody WriteAheadSink, użyj
buildWriteAheadSink
metody zamiastbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Pełny kod powinien wyglądać mniej więcej tak:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Sprawdź, czy dane są pozyskiwane
Po skonfigurowaniu połączenia dane są wysyłane do tabeli. Możesz sprawdzić, czy dane są pozyskiwane, uruchamiając zapytanie KQL.
Uruchom następujące zapytanie, aby sprawdzić, czy dane są pozyskiwane do tabeli:
<MyTable> | count
Uruchom następujące zapytanie, aby wyświetlić dane:
<MyTable> | take 100