Hent data fra Apache Spark
Apache Spark er en enhetlig analysemotor for databehandling i stor skala.
Kusto-koblingen for Spark er et åpen kilde prosjekt som kan kjøre på en hvilken som helst Spark-klynge. Den implementerer datakilde og datavask for å flytte data på tvers av Azure Data Explorer- og Spark-klynger. Ved hjelp av et Eventhouse og Apache Spark kan du bygge raske og skalerbare programmer rettet mot datadrevne scenarier. Maskinlæring (ML), Extract-Transform-Load (ETL) og Log Analytics. Med koblingen blir Eventhouses et gyldig datalager for standard Spark-kilde- og vaskoperasjoner, for eksempel skrive, lese og skrivestrøm.
Du kan skrive til Eventhouse via inninntak i kø eller strømming. Lesing fra Eventhouses støtter kolonnebeskjæring og predikat-pushdown, som filtrerer dataene i Eventhouse, noe som reduserer volumet av overførte data.
Denne artikkelen beskriver hvordan du installerer og konfigurerer Spark-koblingen og flytter data mellom en Eventhouse- og Apache Spark-klynge.
Merk
Selv om noen av eksemplene nedenfor refererer til en Azure Databricks Spark-klynge, tar ikke Spark-koblingen direkte avhengigheter av Databricks eller andre Spark-distribusjoner.
Forutsetning
- Et Azure-abonnement. Opprett en kostnadsfri Azure-konto. Dette brukes til godkjenning ved hjelp av Microsoft Entra ID.
- En KQL-database i Microsoft Fabric. Kopier URI-en for denne databasen ved hjelp av instruksjonene i Access, en eksisterende KQL-database.
- En Spark-klynge
- Installer koblingsbibliotek:
- Forhåndsbygde biblioteker for Spark 2.4+Scala 2.11 eller Spark 3+scala 2.12
- Maven-repo
- Maven 3.x installert
Tips
Spark 2.3.x-versjoner støttes også, men kan kreve noen endringer i pom.xml avhengigheter.
Slik bygger du Spark-koblingen
Fra og med versjon 2.3.0 introduserer vi nye artefakt-ID-er som erstatter spark-kusto-kobling: kusto-spark_3.0_2.12 rettet mot Spark 3.x og Scala 2.12.
Merk
Versjoner før 2.5.1 fungerer ikke lenger for inntak til en eksisterende tabell. Oppdater til en senere versjon. Dette trinnet er valgfritt. Hvis du bruker forhåndsbygde biblioteker, for eksempel Maven, kan du se konfigurasjonen av Spark-klyngen.
Bygg forutsetninger
Se denne kilden for å bygge Spark Connector.
For Scala/Java-programmer som bruker Maven-prosjektdefinisjoner, kan du koble programmet til den nyeste artefakten. Finn den nyeste artefakten på Maven Central.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
Hvis du ikke bruker forhåndsbygde biblioteker, må du installere bibliotekene som er oppført i avhengigheter , inkludert følgende Kusto Java SDK-biblioteker . Hvis du vil finne riktig versjon å installere, kan du se i den aktuelle utgivelsens pom:
Slik bygger du krukke og kjører alle tester:
mvn clean package -DskipTests
Hvis du vil bygge krukke, kjører du alle testene og installerer krukken i det lokale Maven-repositoriet:
mvn clean install -DskipTests
Hvis du vil ha mer informasjon, kan du se koblingsbruk.
Spark-klyngeoppsett
Merk
Det anbefales å bruke den nyeste Kusto Spark-koblingsutgivelsen når du utfører følgende trinn.
Konfigurer følgende spark-klyngeinnstillinger basert på Azure Databricks-klyngen Spark 3.0.1 og Scala 2.12:
Installer det nyeste spark-kusto-koblingsbiblioteket fra Maven:
Kontroller at alle nødvendige biblioteker er installert:
Kontroller at andre avhengigheter ble installert for installasjon ved hjelp av en JAR-fil:
Autentisering
Kusto Spark-kobling gjør det mulig å godkjenne med Microsoft Entra ID ved hjelp av én av følgende metoder:
- Et Microsoft Entra-program
- Et Microsoft Entra-tilgangstoken
- Enhetsgodkjenning (for scenarioer for manglende utvikling)
- Et Azure Key Vault Hvis du vil ha tilgang til Key Vault-ressursen, installerer du azure-keyvault-pakken og gir programlegitimasjon.
Microsoft Entra-programgodkjenning
Microsoft Entra-programgodkjenning er den enkleste og vanligste godkjenningsmetoden og anbefales for Kusto Spark-koblingen.
Logg på Azure-abonnementet via Azure CLI. Deretter godkjenner du i nettleseren.
az login
Velg abonnementet som vert for hovedstolen. Dette trinnet er nødvendig når du har flere abonnementer.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Opprett tjenestekontohaveren. I dette eksemplet kalles
my-service-principal
tjenestekontohaveren .az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Kopier ,
appId
ogpassword
for fremtidig bruk,tenant
fra de returnerte JSON-dataene.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Du har opprettet Microsoft Entra-programmet og tjenestekontohaveren.
Spark-koblingen bruker følgende Entra-appegenskaper for godkjenning:
Egenskaper | Alternativstreng | Bekrivelse |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra-programidentifikator (klient). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Godkjenningsinstans for Microsoft Entra. Microsoft Entra Directory (tenant) ID. Valgfritt – standarder for microsoft.com. Hvis du vil ha mer informasjon, kan du se Microsoft Entra-myndighet. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra-programnøkkel for klienten. |
KUSTO_ACCESS_TOKEN | kustoAccessToken | Hvis du allerede har et accessToken som er opprettet med tilgang til Kusto, kan dette også brukes til koblingen for godkjenning. |
Merk
Eldre API-versjoner (mindre enn 2,0,0) har følgende navn: «kustoAADClientID», «kustoClientAADClientPassword», «kustoAADAuthorityID»
Kusto-rettigheter
Gi følgende rettigheter på kustosiden basert på Spark-operasjonen du vil utføre.
Spark-operasjon | Rettigheter |
---|---|
Lese - enkel modus | Leser |
Les – Tving distribuert modus | Leser |
Skriv – kømodus med opprettingsalternativet CreateTableIfNotExist-tabell | Administrator |
Skriv – Kømodus med opprettingsalternativet FailIfNotExist-tabell | Ingestor |
Skriv – TransactionalMode | Administrator |
Hvis du vil ha mer informasjon om hovedroller, kan du se rollebasert tilgangskontroll. Hvis du vil administrere sikkerhetsroller, kan du se administrasjon av sikkerhetsroller.
Spark vask: skrive til Kusto
Konfigurer synkeparametere:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Skriv Spark DataFrame til Kusto-klyngen som gruppe:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Eller bruk den forenklede syntaksen:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ // Optional, for any extra options: val conf: Map[String, String] = Map() val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Skriv data for strømming:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // As an alternative to adding .option by .option, you can provide a map: val conf: Map[String, String] = Map( KustoSinkOptions.KUSTO_CLUSTER -> cluster, KustoSinkOptions.KUSTO_TABLE -> table, KustoSinkOptions.KUSTO_DATABASE -> database, KustoSourceOptions.KUSTO_ACCESS_TOKEN -> accessToken) // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark kilde: lesing fra Kusto
Når du leser små mengder data, definerer du dataspørringen:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Valgfritt: Hvis du oppgir midlertidig blob-lagring (og ikke Kusto), opprettes blobene under innringerens ansvar. Dette omfatter klargjøring av lagring, roterende tilgangstaster og sletting av midlertidige artefakter. KustoBlobStorageUtils-modulen inneholder hjelpefunksjoner for sletting av blober basert på konto- og beholderkoordinater og kontolegitimasjon, eller en fullstendig SAS-nettadresse med skrive-, lese- og listetillatelser. Når tilsvarende RDD ikke lenger er nødvendig, lagrer hver transaksjon forbigående blob-artefakter i en egen katalog. Denne katalogen registreres som en del av informasjonslogger for lesetransaksjoner som rapporteres på Spark Driver-noden.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
I eksemplet ovenfor får du ikke tilgang til nøkkelhvelvet ved hjelp av koblingsgrensesnittet. En enklere metode for å bruke Databricks-hemmelighetene brukes.
Les fra Kusto.
Hvis du oppgir midlertidig blob-lagring, kan du lese fra Kusto på følgende måte:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Hvis Kusto leverer midlertidig blob-lagring, kan du lese fra Kusto på følgende måte:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)