Integrace Azure Data Exploreru a Apache Flinku®
Azure Data Explorer je plně spravovaná vysoce výkonná analytická platforma pro velké objemy dat, která umožňuje snadno analyzovat velké objemy dat téměř v reálném čase.
ADX pomáhá uživatelům analyzovat velké objemy dat ze streamovaných aplikací, webů, zařízení IoT atd. Integrace Apache Flinku s ADX pomáhá zpracovávat data v reálném čase a analyzovat je v ADX.
Požadavky
- Vytvoření clusteru Apache Flink v HDInsight na AKS
- Vytvořit Průzkumníka dat Azure
Postup použití Azure Data Exploreru jako jímky v Flinku
vytvořit ADX s databázovými a tabulkami podle potřeby.
Přidejte oprávnění ingestoru pro spravovanou identitu v Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Spusťte ukázkový program definující identifikátor URI clusteru Kusto (Uniform Resource Identifier), použitou databázi a spravovanou identitu a tabulku, do které se musí zapisovat.
Naklonujte projekt flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git
Vytvoření tabulky v ADX pomocí následujícího příkazu
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Aktualizujte soubor FlinkKustoSinkSample.java správným URI clusteru Kusto, správnou databází a používanou spravovanou identitou.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Později sestavte projekt pomocí "čistého balíčku mvn".
Vyhledejte soubor JAR s názvem "samples-java-1.0-SNAPSHOT-shaded.jar" ve složce sample-java/target a pak tento soubor JAR nahrajte do uživatelského rozhraní Flink a odešlete úlohu.
Ověření výstupu dotazem na tabulku Kusto
Nedochází ke zpoždění při zápisu dat do tabulky Kusto z Flinku.
Odkaz
- webu Apache Flink
- Názvy projektů Apache, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známky Apache Software Foundation (ASF).