Freigeben über


Azure Data Explorer-Connector für Apache Spark

Apache Spark ist eine vereinheitlichte Engine zur Verarbeitung von umfangreichen Daten. Azure Data Explorer ist ein schneller, vollständig verwalteter Datenanalysedienst für Echtzeitanalysen großer Mengen an Daten.

Der Kusto-Connector für Spark ist ein Open-Source-Projekt, das in einem beliebigen Spark-Cluster ausgeführt werden kann. Er implementiert Datenquellen und Datensenken zum Verschieben von Daten zwischen Azure Data Explorer und Spark-Clustern. Mit Azure Data Explorer und Apache Spark können Sie schnelle und skalierbare Anwendungen für datengesteuerte Szenarien erstellen. Beispiele dafür sind maschinelles Lernen (Machine Learning, ML), Extrahieren, Transformieren und Laden (Extract-Transform-Load, ETL) und Protokollanalysen (Log Analytics). Durch den Connector wird Azure Data Explorer ein gültiger Datenspeicher für Spark-Standardvorgänge für Quellen und Senken wie„write“, „read“ und „writeStream“.

Sie können über die Warteschlangen- oder Streaming-Erfassung in Azure Data Explorer schreiben. Das Lesen aus Azure Data Explorer unterstützt das Löschen von Spalten und die Prädikatweitergabe. Dabei werden die Daten in Azure Data Explorer gefiltert, wodurch die Menge der übertragenen Daten verringert wird.

Hinweis

Informationen zum Arbeiten mit dem Synapse Spark-Connector für Azure Data Explorer finden Sie unter Verbinden mit Azure Data Explorer mithilfe von Apache Spark für Azure Synapse Analytics.

In diesem Thema wird beschrieben, wie Sie den Azure Data Explorer-Connector für Spark installieren und konfigurieren und Daten zwischen Azure Data Explorer und Apache Spark-Clustern verschieben.

Hinweis

Obwohl sich einige der unten stehenden Beispiele auf einen Azure Databricks Spark-Cluster beziehen, akzeptiert Azure Data Explorer-Connector für Spark keine direkten Abhängigkeiten von Databricks oder einer anderen Spark-Distribution.

Voraussetzungen

Tipp

Spark 2.3.x-Versionen werden ebenfalls unterstützt, erfordern aber möglicherweise Änderungen in den pom.xml-Abhängigkeiten.

Erstellen des Spark-Connectors

Ab Version 2.3.0 werden neue Artefakt-IDs eingeführt, die „spark-kusto-connector ersetzen“: kusto-spark_3.0_2.12 für Spark 3.x und Scala 2.12.

Hinweis

Versionen vor 2.5.1 funktionieren nicht mehr für die Erfassung in einer vorhandenen Tabelle. Führen Sie die Aktualisierung auf eine höhere Version durch. Dieser Schritt ist optional. Wenn Sie vorgefertigte Bibliotheken (etwa Maven) verwenden, finden Sie weitere Informationen unter Einrichtung des Spark-Clusters.

Buildvoraussetzungen

  1. Ziehe Sie diese Quelle zum Erstellen des Spark-Connectors zurate.

  2. Wenn Sie Scala- und Java-Anwendungen mit Maven-Projektdefinitionen verwenden, verknüpfen Sie Ihre Anwendung mit dem neuesten Artefakt: Suchen Sie das neueste Artefakt unter Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Wenn Sie keine vorgefertigten Bibliotheken verwenden, müssen Sie die unter Abhängigkeiten aufgeführten Bibliotheken installieren, einschließlich der folgenden Kusto Java SDK-Bibliotheken. Die richtige Version für die Installation finden Sie in der POM-Datei des entsprechenden Releases:

    1. Führen Sie den folgenden Befehl aus, um die JAR-Datei zu erstellen und alle Tests auszuführen:

      mvn clean package -DskipTests
      
    2. Um die JAR-Datei zu erstellen, führen Sie alle Tests aus, und installieren Sie die JAR-Datei in Ihrem lokalen Maven-Repository:

      mvn clean install -DskipTests
      

Weitere Informationen finden Sie unter Connectorverwendung.

Einrichtung des Spark-Clusters

Hinweis

Es wird empfohlen, die aktuelle Version des Kusto-Connectors für Spark zu verwenden, wenn Sie die folgenden Schritte ausführen.

  1. Konfigurieren Sie die folgenden Spark-Clustereinstellungen basierend auf dem Azure Databricks-Cluster unter Verwendung von Spark 3.0.1 und Scala 2.12:

    Databricks-Clustereinstellungen

  2. Installieren Sie die neueste Spark-Kusto-Connectorbibliothek von Maven:

    Importbibliotheken.Auswählen des Spark-Kusto-Connectors

  3. Überprüfen Sie, ob alle erforderlichen Bibliotheken installiert sind:

    Überprüfen der installierten Bibliotheken

  4. Überprüfen Sie bei der Installation mithilfe einer JAR-Datei, ob andere Abhängigkeiten installiert wurden:

    Hinzufügen von Abhängigkeiten

Authentifizierung

Mit dem Kusto-Connector können Sie sich mit einer der folgenden Methoden bei Microsoft Entra ID authentifizieren:

Microsoft Entra-Anwendung – Authentifizierung

Die Microsoft Entra-Anwendungsauthentifizierung ist die einfachste und gängigste Authentifizierungsmethode. Sie wird für den Kusto-Connector für Spark empfohlen.

  1. Melden Sie sich per Azure CLI an Ihrem Azure-Abonnement an. Führen Sie anschließend im Browser die Authentifizierung durch.

    az login
    
  2. Wählen Sie das Abonnement aus, um den Prinzipal zu hosten. Dieser Schritt ist erforderlich, wenn Sie über mehrere Abonnements verfügen.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Erstellen Sie den Dienstprinzipal. In diesem Beispiel wird der Dienstprinzipal als my-service-principal bezeichnet.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopieren Sie aus den zurückgegebenen JSON-Daten appId, password und tenant für die zukünftige Verwendung.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Sie haben Ihre Microsoft Entra-Anwendung und den Dienstprinzipal erstellt.

Der Spark-Connector verwendet die folgenden Entra-App-Eigenschaften für die Authentifizierung:

Eigenschaften Optionszeichenfolge Beschreibung
KUSTO_AAD_APP_ID kustoAadAppId Der (Client-)Bezeichner der Microsoft Entra-Anwendung.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra Autoritative Stelle. Microsoft Entra Verzeichnis (Mandant)-ID. Optional – Standardmäßig microsoft.com. Weitere Informationen finden Sie unter Microsoft Entra-Autoritative Stelle.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra-Anwendungsschlüssel für den Client.
KUSTO_ACCESS_TOKEN kustoAccessToken Wenn Sie bereits über ein Zugriffstoken verfügen, das mit Zugriff auf Kusto erstellt wird, kann dieses auch für die Authentifizierung an den Connector übergeben werden.

Hinweis

Ältere API-Versionen (vor 2.0.0) haben den folgenden Namen: „kustoAADClientID“, „kustoClientAADClientPassword“, „kustoAADAuthorityID“.

Kusto-Berechtigungen

Gewähren Sie den folgenden Berechtigungen auf Kusto-Seite basierend auf dem Spark-Vorgang, den Sie ausführen möchten.

Spark-Vorgang Berechtigungen
Lesen – Einzelmodus Leser
Lesen – verteilten Modus erzwingen Leser
Schreiben – Warteschlangenmodus mit CreateTableIfNotExist-Tabellenerstellungsoption Administrator
Schreiben – Warteschlangenmodus mit FailIfNotExist-Tabellenerstellungsoption Ingestor
Schreiben – TransactionalMode Administrator

Weitere Informationen zu Prinzipalrollen finden Sie unter Rollenbasierte Zugriffssteuerung. Informationen zum Verwalten von Sicherheitsrollen finden Sie unter Sicherheitsrollenverwaltung.

Spark-Senke: in Kusto schreiben

  1. Einrichten von Senkenparametern:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark-Dataframe als Batch in Kusto-Cluster schreiben:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Oder verwenden Sie die vereinfachte Syntax:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Schreiben von Streamingdaten:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark-Quelle: aus Kusto lesen

  1. Wenn kleine Datenmengen gelesen werden, definieren Sie die Datenabfrage:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Optional: Wenn Sie den temporären Blobspeicher (und nicht Kusto) bereitstellen, werden die Blobs in der Verantwortung des Aufrufers erstellt. Dies umfasst das Bereitstellen des Speichers, das Rotieren von Zugriffsschlüsseln und das Löschen temporärer Artefakte. Das KustoBlobStorageUtils-Modul enthält Hilfsfunktionen zum Löschen von Blobs, die entweder auf Konto- und Containerkoordinaten und Kontoanmeldeinformationen oder einer vollständigen SAS-URL mit Schreib-, Lese- und Listenberechtigungen basieren. Wenn das entsprechende RDD nicht mehr benötigt wird, speichert jede Transaktion temporäre Blobartefakte in einem separaten Verzeichnis. Dieses Verzeichnis wird als Teil der Lesetransaktionsinformationsprotokolle erfasst, die auf dem Spark-Treiberknoten gemeldet werden.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Im obigen Beispiel erfolgt der Zugriff auf den Schlüsseltresor nicht über die Connectorschnittstelle. Es wird eine einfachere Methode der Verwendung der Databricks-Geheimnisse verwendet.

  3. Lesen Sie aus Kusto.

    • Wenn Sie den vorübergehenden Blobspeicher bereitstellen, lesen Sie in Kusto wie folgt:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Wenn Kusto den vorübergehenden Blobspeicher bereitstellt, lesen Sie in Kusto wie folgt:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)