Integration von Azure Data Explorer und Apache Flink®
Azure Data Explorer ist eine vollständig verwaltete, leistungsstarke Big Data Analytics-Plattform, die es einfach macht, hohe Datenmengen in nahezu Echtzeit zu analysieren.
ADX hilft Benutzern bei der Analyse großer Datenmengen aus Streaminganwendungen, Websites, IoT-Geräten usw. Die Integration von Apache Flink mit ADX hilft Ihnen, Echtzeitdaten zu verarbeiten und in ADX zu analysieren.
Voraussetzungen
Schritte zum Verwenden des Azure-Daten-Explorers als Sink in Flink
Erstellen Sie ADX mit der Datenbank und der Tabelle nach Bedarf.
Fügen Sie Ingestor-Berechtigungen für die verwaltete Identität in Kusto hinzu.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Führen Sie ein Beispielprogramm aus, das die URI (Uniform Resource Identifier) des Kusto-Clusters, die Datenbank, die verwaltete Identität und die Tabelle definiert, in die geschrieben werden muss.
Klonen Sie das Projekt "flink-connector-kusto": https://github.com/Azure/flink-connector-kusto.git
Erstellen der Tabelle in ADX mithilfe des folgenden Befehls
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Aktualisieren Sie die Datei FlinkKustoSinkSample.java mit der richtigen Kusto-Cluster-URI, der Datenbank und der verwendeten verwalteten Identität.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Erstellen Sie das Projekt später mit "mvn clean package"
Suchen Sie die JAR-Datei mit dem Namen "samples-java-1.0-SNAPSHOT-shaded.jar" unter dem Ordner "sample-java/target", laden Sie diese JAR-Datei in die Flink-Benutzeroberfläche hoch, und übermitteln Sie den Auftrag.
Die Kusto-Tabelle abfragen, um die Ausgabe zu überprüfen
Es gibt keine Verzögerung beim Schreiben der Daten in die Kusto-Tabelle aus Flink.
Referenz
- Apache Flink Website
- Apache, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).