Del via


Hent data fra Apache Spark

Apache Spark er en enhetlig analysemotor for databehandling i stor skala.

Kusto-koblingen for Spark er et åpen kilde prosjekt som kan kjøre på en hvilken som helst Spark-klynge. Den implementerer datakilde og datavask for å flytte data på tvers av Azure Data Explorer- og Spark-klynger. Ved hjelp av et Eventhouse og Apache Spark kan du bygge raske og skalerbare programmer rettet mot datadrevne scenarier. Maskinlæring (ML), Extract-Transform-Load (ETL) og Log Analytics. Med koblingen blir Eventhouses et gyldig datalager for standard Spark-kilde- og vaskoperasjoner, for eksempel skrive, lese og skrivestrøm.

Du kan skrive til Eventhouse via inninntak i kø eller strømming. Lesing fra Eventhouses støtter kolonnebeskjæring og predikat-pushdown, som filtrerer dataene i Eventhouse, noe som reduserer volumet av overførte data.

Denne artikkelen beskriver hvordan du installerer og konfigurerer Spark-koblingen og flytter data mellom en Eventhouse- og Apache Spark-klynge.

Merk

Selv om noen av eksemplene nedenfor refererer til en Azure Databricks Spark-klynge, tar ikke Spark-koblingen direkte avhengigheter av Databricks eller andre Spark-distribusjoner.

Forutsetning

Tips

Spark 2.3.x-versjoner støttes også, men kan kreve noen endringer i pom.xml avhengigheter.

Slik bygger du Spark-koblingen

Fra og med versjon 2.3.0 introduserer vi nye artefakt-ID-er som erstatter spark-kusto-kobling: kusto-spark_3.0_2.12 rettet mot Spark 3.x og Scala 2.12.

Merk

Versjoner før 2.5.1 fungerer ikke lenger for inntak til en eksisterende tabell. Oppdater til en senere versjon. Dette trinnet er valgfritt. Hvis du bruker forhåndsbygde biblioteker, for eksempel Maven, kan du se konfigurasjonen av Spark-klyngen.

Bygg forutsetninger

  1. Se denne kilden for å bygge Spark Connector.

  2. For Scala/Java-programmer som bruker Maven-prosjektdefinisjoner, kan du koble programmet til den nyeste artefakten. Finn den nyeste artefakten på Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Hvis du ikke bruker forhåndsbygde biblioteker, må du installere bibliotekene som er oppført i avhengigheter , inkludert følgende Kusto Java SDK-biblioteker . Hvis du vil finne riktig versjon å installere, kan du se i den aktuelle utgivelsens pom:

    1. Slik bygger du krukke og kjører alle tester:

      mvn clean package -DskipTests
      
    2. Hvis du vil bygge krukke, kjører du alle testene og installerer krukken i det lokale Maven-repositoriet:

      mvn clean install -DskipTests
      

Hvis du vil ha mer informasjon, kan du se koblingsbruk.

Spark-klyngeoppsett

Merk

Det anbefales å bruke den nyeste Kusto Spark-koblingsutgivelsen når du utfører følgende trinn.

  1. Konfigurer følgende spark-klyngeinnstillinger basert på Azure Databricks-klyngen Spark 3.0.1 og Scala 2.12:

    Innstillinger for Databricks-klynger.

  2. Installer det nyeste spark-kusto-koblingsbiblioteket fra Maven:

    Importer biblioteker. Velg Spark-Kusto-Connector.

  3. Kontroller at alle nødvendige biblioteker er installert:

    Kontroller at biblioteker er installert.

  4. Kontroller at andre avhengigheter ble installert for installasjon ved hjelp av en JAR-fil:

    Legg til avhengigheter.

Autentisering

Kusto Spark-kobling gjør det mulig å godkjenne med Microsoft Entra ID ved hjelp av én av følgende metoder:

Microsoft Entra-programgodkjenning

Microsoft Entra-programgodkjenning er den enkleste og vanligste godkjenningsmetoden og anbefales for Kusto Spark-koblingen.

  1. Logg på Azure-abonnementet via Azure CLI. Deretter godkjenner du i nettleseren.

    az login
    
  2. Velg abonnementet som vert for hovedstolen. Dette trinnet er nødvendig når du har flere abonnementer.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Opprett tjenestekontohaveren. I dette eksemplet kalles my-service-principaltjenestekontohaveren .

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopier , appIdog password for fremtidig bruk, tenantfra de returnerte JSON-dataene.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Du har opprettet Microsoft Entra-programmet og tjenestekontohaveren.

Spark-koblingen bruker følgende Entra-appegenskaper for godkjenning:

Egenskaper Alternativstreng Bekrivelse
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra-programidentifikator (klient).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Godkjenningsinstans for Microsoft Entra. Microsoft Entra Directory (tenant) ID. Valgfritt – standarder for microsoft.com. Hvis du vil ha mer informasjon, kan du se Microsoft Entra-myndighet.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra-programnøkkel for klienten.
KUSTO_ACCESS_TOKEN kustoAccessToken Hvis du allerede har et accessToken som er opprettet med tilgang til Kusto, kan dette også brukes til koblingen for godkjenning.

Merk

Eldre API-versjoner (mindre enn 2,0,0) har følgende navn: «kustoAADClientID», «kustoClientAADClientPassword», «kustoAADAuthorityID»

Kusto-rettigheter

Gi følgende rettigheter på kustosiden basert på Spark-operasjonen du vil utføre.

Spark-operasjon Rettigheter
Lese - enkel modus Leser
Les – Tving distribuert modus Leser
Skriv – kømodus med opprettingsalternativet CreateTableIfNotExist-tabell Administrator
Skriv – Kømodus med opprettingsalternativet FailIfNotExist-tabell Ingestor
Skriv – TransactionalMode Administrator

Hvis du vil ha mer informasjon om hovedroller, kan du se rollebasert tilgangskontroll. Hvis du vil administrere sikkerhetsroller, kan du se administrasjon av sikkerhetsroller.

Spark vask: skrive til Kusto

  1. Konfigurer synkeparametere:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Skriv Spark DataFrame til Kusto-klyngen som gruppe:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Eller bruk den forenklede syntaksen:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    // Optional, for any extra options:
    val conf: Map[String, String] = Map()
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Skriv data for strømming:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // As an alternative to adding .option by .option, you can provide a map:
    val conf: Map[String, String] = Map(
      KustoSinkOptions.KUSTO_CLUSTER -> cluster,
      KustoSinkOptions.KUSTO_TABLE -> table,
      KustoSinkOptions.KUSTO_DATABASE -> database,
      KustoSourceOptions.KUSTO_ACCESS_TOKEN -> accessToken)
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf)
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark kilde: lesing fra Kusto

  1. Når du leser små mengder data, definerer du dataspørringen:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Valgfritt: Hvis du oppgir midlertidig blob-lagring (og ikke Kusto), opprettes blobene under innringerens ansvar. Dette omfatter klargjøring av lagring, roterende tilgangstaster og sletting av midlertidige artefakter. KustoBlobStorageUtils-modulen inneholder hjelpefunksjoner for sletting av blober basert på konto- og beholderkoordinater og kontolegitimasjon, eller en fullstendig SAS-nettadresse med skrive-, lese- og listetillatelser. Når tilsvarende RDD ikke lenger er nødvendig, lagrer hver transaksjon forbigående blob-artefakter i en egen katalog. Denne katalogen registreres som en del av informasjonslogger for lesetransaksjoner som rapporteres på Spark Driver-noden.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    I eksemplet ovenfor får du ikke tilgang til nøkkelhvelvet ved hjelp av koblingsgrensesnittet. En enklere metode for å bruke Databricks-hemmelighetene brukes.

  3. Les fra Kusto.

    • Hvis du oppgir midlertidig blob-lagring, kan du lese fra Kusto på følgende måte:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Hvis Kusto leverer midlertidig blob-lagring, kan du lese fra Kusto på følgende måte:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)