Azure Data Explorer-Connector für Apache Spark
Apache Spark ist eine vereinheitlichte Engine zur Verarbeitung von umfangreichen Daten. Azure Data Explorer ist ein schneller, vollständig verwalteter Datenanalysedienst für Echtzeitanalysen großer Mengen an Daten.
Der Kusto-Connector für Spark ist ein Open-Source-Projekt, das in einem beliebigen Spark-Cluster ausgeführt werden kann. Er implementiert Datenquellen und Datensenken zum Verschieben von Daten zwischen Azure Data Explorer und Spark-Clustern. Mit Azure Data Explorer und Apache Spark können Sie schnelle und skalierbare Anwendungen für datengesteuerte Szenarien erstellen. Beispiele dafür sind maschinelles Lernen (Machine Learning, ML), Extrahieren, Transformieren und Laden (Extract-Transform-Load, ETL) und Protokollanalysen (Log Analytics). Durch den Connector wird Azure Data Explorer ein gültiger Datenspeicher für Spark-Standardvorgänge für Quellen und Senken wie„write“, „read“ und „writeStream“.
Sie können über die Warteschlangen- oder Streaming-Erfassung in Azure Data Explorer schreiben. Das Lesen aus Azure Data Explorer unterstützt das Löschen von Spalten und die Prädikatweitergabe. Dabei werden die Daten in Azure Data Explorer gefiltert, wodurch die Menge der übertragenen Daten verringert wird.
Hinweis
Informationen zum Arbeiten mit dem Synapse Spark-Connector für Azure Data Explorer finden Sie unter Verbinden mit Azure Data Explorer mithilfe von Apache Spark für Azure Synapse Analytics.
In diesem Thema wird beschrieben, wie Sie den Azure Data Explorer-Connector für Spark installieren und konfigurieren und Daten zwischen Azure Data Explorer und Apache Spark-Clustern verschieben.
Hinweis
Obwohl sich einige der unten stehenden Beispiele auf einen Azure Databricks Spark-Cluster beziehen, akzeptiert Azure Data Explorer-Connector für Spark keine direkten Abhängigkeiten von Databricks oder einer anderen Spark-Distribution.
Voraussetzungen
- Ein Azure-Abonnement. Erstellen Sie ein kostenloses Azure-Konto.
- Schnellstart: Erstellen eines Azure Data Explorer-Clusters und einer Datenbank. Erstellen eines Clusters und einer Datenbank
- Ein Spark-Cluster
- Connectorbibliothek installieren:
- Vordefinierte Bibliotheken für Spark 2.4 und Scala 2.11 oder Spark 3 und Scala 2.12
- Maven-Repository
- Maven 3.x-Installation
Tipp
Spark 2.3.x-Versionen werden ebenfalls unterstützt, erfordern aber möglicherweise Änderungen in den pom.xml-Abhängigkeiten.
Erstellen des Spark-Connectors
Ab Version 2.3.0 werden neue Artefakt-IDs eingeführt, die „spark-kusto-connector ersetzen“: kusto-spark_3.0_2.12 für Spark 3.x und Scala 2.12.
Hinweis
Versionen vor 2.5.1 funktionieren nicht mehr für die Erfassung in einer vorhandenen Tabelle. Führen Sie die Aktualisierung auf eine höhere Version durch. Dieser Schritt ist optional. Wenn Sie vorgefertigte Bibliotheken (etwa Maven) verwenden, finden Sie weitere Informationen unter Einrichtung des Spark-Clusters.
Buildvoraussetzungen
Ziehe Sie diese Quelle zum Erstellen des Spark-Connectors zurate.
Wenn Sie Scala- und Java-Anwendungen mit Maven-Projektdefinitionen verwenden, verknüpfen Sie Ihre Anwendung mit dem neuesten Artefakt: Suchen Sie das neueste Artefakt unter Maven Central.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
Wenn Sie keine vorgefertigten Bibliotheken verwenden, müssen Sie die unter Abhängigkeiten aufgeführten Bibliotheken installieren, einschließlich der folgenden Kusto Java SDK-Bibliotheken. Die richtige Version für die Installation finden Sie in der POM-Datei des entsprechenden Releases:
Führen Sie den folgenden Befehl aus, um die JAR-Datei zu erstellen und alle Tests auszuführen:
mvn clean package -DskipTests
Um die JAR-Datei zu erstellen, führen Sie alle Tests aus, und installieren Sie die JAR-Datei in Ihrem lokalen Maven-Repository:
mvn clean install -DskipTests
Weitere Informationen finden Sie unter Connectorverwendung.
Einrichtung des Spark-Clusters
Hinweis
Es wird empfohlen, die aktuelle Version des Kusto-Connectors für Spark zu verwenden, wenn Sie die folgenden Schritte ausführen.
Konfigurieren Sie die folgenden Spark-Clustereinstellungen basierend auf dem Azure Databricks-Cluster unter Verwendung von Spark 3.0.1 und Scala 2.12:
Installieren Sie die neueste Spark-Kusto-Connectorbibliothek von Maven:
Überprüfen Sie, ob alle erforderlichen Bibliotheken installiert sind:
Überprüfen Sie bei der Installation mithilfe einer JAR-Datei, ob andere Abhängigkeiten installiert wurden:
Authentifizierung
Mit dem Kusto-Connector können Sie sich mit einer der folgenden Methoden bei Microsoft Entra ID authentifizieren:
- Eine Microsoft Entra-Anwendung
- Ein Microsoft Entra-Zugriffstoken
- Geräteauthentifizierung (in Nicht-Produktionsszenarios)
- Azure Key Vault: Um auf die Key Vault-Ressource zugreifen zu können, müssen Sie das „azure-keyvault“-Paket installieren und die Anwendungsanmeldeinformationen bereitstellen.
Microsoft Entra-Anwendung – Authentifizierung
Die Microsoft Entra-Anwendungsauthentifizierung ist die einfachste und gängigste Authentifizierungsmethode. Sie wird für den Kusto-Connector für Spark empfohlen.
Melden Sie sich per Azure CLI an Ihrem Azure-Abonnement an. Führen Sie anschließend im Browser die Authentifizierung durch.
az login
Wählen Sie das Abonnement aus, um den Prinzipal zu hosten. Dieser Schritt ist erforderlich, wenn Sie über mehrere Abonnements verfügen.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Erstellen Sie den Dienstprinzipal. In diesem Beispiel wird der Dienstprinzipal als
my-service-principal
bezeichnet.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Kopieren Sie aus den zurückgegebenen JSON-Daten
appId
,password
undtenant
für die zukünftige Verwendung.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Sie haben Ihre Microsoft Entra-Anwendung und den Dienstprinzipal erstellt.
Der Spark-Connector verwendet die folgenden Entra-App-Eigenschaften für die Authentifizierung:
Eigenschaften | Optionszeichenfolge | Beschreibung |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Der (Client-)Bezeichner der Microsoft Entra-Anwendung. |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra Autoritative Stelle. Microsoft Entra Verzeichnis (Mandant)-ID. Optional – Standardmäßig microsoft.com. Weitere Informationen finden Sie unter Microsoft Entra-Autoritative Stelle. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra-Anwendungsschlüssel für den Client. |
KUSTO_ACCESS_TOKEN | kustoAccessToken | Wenn Sie bereits über ein Zugriffstoken verfügen, das mit Zugriff auf Kusto erstellt wird, kann dieses auch für die Authentifizierung an den Connector übergeben werden. |
Hinweis
Ältere API-Versionen (vor 2.0.0) haben den folgenden Namen: „kustoAADClientID“, „kustoClientAADClientPassword“, „kustoAADAuthorityID“.
Kusto-Berechtigungen
Gewähren Sie den folgenden Berechtigungen auf Kusto-Seite basierend auf dem Spark-Vorgang, den Sie ausführen möchten.
Spark-Vorgang | Berechtigungen |
---|---|
Lesen – Einzelmodus | Leser |
Lesen – verteilten Modus erzwingen | Leser |
Schreiben – Warteschlangenmodus mit CreateTableIfNotExist-Tabellenerstellungsoption | Administrator |
Schreiben – Warteschlangenmodus mit FailIfNotExist-Tabellenerstellungsoption | Ingestor |
Schreiben – TransactionalMode | Administrator |
Weitere Informationen zu Prinzipalrollen finden Sie unter Rollenbasierte Zugriffssteuerung. Informationen zum Verwalten von Sicherheitsrollen finden Sie unter Sicherheitsrollenverwaltung.
Spark-Senke: in Kusto schreiben
Einrichten von Senkenparametern:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Spark-Dataframe als Batch in Kusto-Cluster schreiben:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Oder verwenden Sie die vereinfachte Syntax:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Schreiben von Streamingdaten:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark-Quelle: aus Kusto lesen
Wenn kleine Datenmengen gelesen werden, definieren Sie die Datenabfrage:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Optional: Wenn Sie den temporären Blobspeicher (und nicht Kusto) bereitstellen, werden die Blobs in der Verantwortung des Aufrufers erstellt. Dies umfasst das Bereitstellen des Speichers, das Rotieren von Zugriffsschlüsseln und das Löschen temporärer Artefakte. Das KustoBlobStorageUtils-Modul enthält Hilfsfunktionen zum Löschen von Blobs, die entweder auf Konto- und Containerkoordinaten und Kontoanmeldeinformationen oder einer vollständigen SAS-URL mit Schreib-, Lese- und Listenberechtigungen basieren. Wenn das entsprechende RDD nicht mehr benötigt wird, speichert jede Transaktion temporäre Blobartefakte in einem separaten Verzeichnis. Dieses Verzeichnis wird als Teil der Lesetransaktionsinformationsprotokolle erfasst, die auf dem Spark-Treiberknoten gemeldet werden.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
Im obigen Beispiel erfolgt der Zugriff auf den Schlüsseltresor nicht über die Connectorschnittstelle. Es wird eine einfachere Methode der Verwendung der Databricks-Geheimnisse verwendet.
Lesen Sie aus Kusto.
Wenn Sie den vorübergehenden Blobspeicher bereitstellen, lesen Sie in Kusto wie folgt:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Wenn Kusto den vorübergehenden Blobspeicher bereitstellt, lesen Sie in Kusto wie folgt:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)