Integracja usług Azure Data Explorer i Apache Flink®
Azure Data Explorer to w pełni zarządzana, wysokiej wydajności platforma do analizy danych big data, która ułatwia analizowanie dużych ilości danych niemal w czasie rzeczywistym.
Usługa ADX ułatwia użytkownikom analizowanie dużych ilości danych z aplikacji przesyłania strumieniowego, witryn internetowych, urządzeń IoT itp. Integracja platformy Apache Flink z usługą ADX pomaga przetwarzać dane w czasie rzeczywistym i analizować je w usłudze ADX.
Wymagania wstępne
- Tworzenie klastra Apache Flink w usłudze HDInsight w usłudze AKS
- Tworzenie eksploratora danych platformy Azure
Kroki używania usługi Azure Data Explorer jako ujścia w funkcji Flink
Utwórz klaster Flink.
Utwórz usługę ADX z bazą danych i tabelą zgodnie z wymaganiami.
Dodaj uprawnienia ingestor dla tożsamości zarządzanej w usłudze Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Uruchom przykładowy program definiujący identyfikator URI klastra Kusto (uniform Resource Identifier), bazę danych i używaną tożsamość zarządzaną oraz tabelę, do których musi zapisywać dane.
Sklonuj projekt flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git
Utwórz tabelę w usłudze ADX przy użyciu następującego polecenia
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Zaktualizuj plik FlinkKustoSinkSample.java przy użyciu odpowiedniego identyfikatora URI klastra Kusto, bazy danych i używanej tożsamości zarządzanej.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Później skompiluj projekt przy użyciu "pakietu clean mvn"
Znajdź plik JAR o nazwie "samples-java-1.0-SNAPSHOT-shaded.jar" w folderze "sample-java/target", a następnie przekaż ten plik JAR w interfejsie użytkownika Flink i prześlij zadanie.
Wykonywanie zapytań względem tabeli Kusto w celu zweryfikowania danych wyjściowych
Nie ma opóźnienia podczas zapisywania danych w tabeli Kusto z funkcji Flink.
Odwołanie
- Witryna internetowa platformy Apache Flink
- Nazwy projektów apache, Apache Flink, Flink i skojarzone z nimi są znakami towarowymi programu Apache Software Foundation (ASF).