Freigeben über


Integration von Azure Data Explorer und Apache Flink®

Azure Data Explorer ist eine vollständig verwaltete, leistungsstarke Big Data Analytics-Plattform, die es einfach macht, hohe Datenmengen in nahezu Echtzeit zu analysieren.

ADX hilft Benutzern bei der Analyse großer Datenmengen aus Streaminganwendungen, Websites, IoT-Geräten usw. Die Integration von Apache Flink mit ADX hilft Ihnen, Echtzeitdaten zu verarbeiten und in ADX zu analysieren.

Voraussetzungen

  1. Erstellen eines Flink-Clusters.

  2. Erstellen Sie ADX mit der Datenbank und der Tabelle nach Bedarf.

  3. Fügen Sie Ingestor-Berechtigungen für die verwaltete Identität in Kusto hinzu.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Führen Sie ein Beispielprogramm aus, das die URI (Uniform Resource Identifier) des Kusto-Clusters, die Datenbank, die verwaltete Identität und die Tabelle definiert, in die geschrieben werden muss.

  5. Klonen Sie das Projekt "flink-connector-kusto": https://github.com/Azure/flink-connector-kusto.git

  6. Erstellen der Tabelle in ADX mithilfe des folgenden Befehls

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Aktualisieren Sie die Datei FlinkKustoSinkSample.java mit der richtigen Kusto-Cluster-URI, der Datenbank und der verwendeten verwalteten Identität.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Erstellen Sie das Projekt später mit "mvn clean package"

  8. Suchen Sie die JAR-Datei mit dem Namen "samples-java-1.0-SNAPSHOT-shaded.jar" unter dem Ordner "sample-java/target", laden Sie diese JAR-Datei in die Flink-Benutzeroberfläche hoch, und übermitteln Sie den Auftrag.

  9. Die Kusto-Tabelle abfragen, um die Ausgabe zu überprüfen

    Screenshot zeigt die Abfrage der Kusto-Tabelle, um die Ausgabe zu überprüfen.

    Es gibt keine Verzögerung beim Schreiben der Daten in die Kusto-Tabelle aus Flink.

Referenz